Compare commits

..

7 Commits

Author SHA1 Message Date
a02ae3e662 fix(db): in_reply_to_id FK ON DELETE SET NULL — deleting a thought no longer blocks if it has replies
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 9m40s
test / unit (pull_request) Successful in 16m10s
test / integration (pull_request) Failing after 17m4s
2026-05-15 16:29:57 +02:00
e43d784c39 feat(worker): DLQ processor — exhausted events moved to failed_events with exponential retry 2026-05-15 16:26:44 +02:00
c092b9e658 feat(postgres): failed_events table and PgFailedEventStore for dead-letter queue 2026-05-15 16:23:21 +02:00
340886fcfe fix(nats): explicit consumer config, ack timeouts, unknown-event acking, delivery_count 2026-05-15 16:20:31 +02:00
75e8d349e3 fix(auth): validate JWT secret length, equalize login timing, reduce TTL to 24h 2026-05-15 16:16:58 +02:00
50a08d8ed6 docs: NATS hardening + DLQ + auth security implementation plan 2026-05-15 16:15:10 +02:00
e278e4e2cc docs: NATS hardening, DLQ, and auth security design spec 2026-05-15 16:10:11 +02:00
17 changed files with 1371 additions and 37 deletions

View File

@@ -95,7 +95,7 @@ mod tests {
#[test]
fn generate_and_validate_token() {
let svc = JwtAuthService::new("secret".into(), 3600);
let svc = JwtAuthService::new("a-secret-that-is-at-least-32-bytes!!".into(), 3600);
let id = UserId::new();
let tok = svc.generate_token(&id).unwrap();
let parsed = svc.validate_token(&tok.token).unwrap();
@@ -104,7 +104,7 @@ mod tests {
#[test]
fn invalid_token_returns_unauthorized() {
let svc = JwtAuthService::new("secret".into(), 3600);
let svc = JwtAuthService::new("a-secret-that-is-at-least-32-bytes!!".into(), 3600);
let err = svc.validate_token("not.a.token").unwrap_err();
assert!(matches!(err, DomainError::Unauthorized));
}

View File

@@ -48,6 +48,7 @@ impl<T: Transport> EventPublisher for EventPublisherAdapter<T> {
pub struct RawMessage {
pub subject: String,
pub payload: Vec<u8>,
pub delivery_count: u64,
pub ack: Box<dyn Fn() + Send + Sync>,
pub nack: Box<dyn Fn() + Send + Sync>,
}
@@ -83,19 +84,22 @@ impl<S: MessageSource> EventConsumer for EventConsumerAdapter<S> {
let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
Ok(p) => p,
Err(e) => {
tracing::warn!("failed to deserialize event payload: {e}");
tracing::warn!("failed to deserialize event payload — acking to prevent orphan: {e}");
(msg.ack)();
return None;
}
};
let event = match DomainEvent::try_from(payload) {
Ok(e) => e,
Err(e) => {
tracing::warn!("unknown event type: {e}");
tracing::warn!("unknown or malformed event type — acking to prevent orphan: {e}");
(msg.ack)();
return None;
}
};
Some(Ok(EventEnvelope {
event,
delivery_count: msg.delivery_count,
ack: msg.ack,
nack: msg.nack,
}))
@@ -192,6 +196,7 @@ mod tests {
let msg = RawMessage {
subject: "thoughts.created".to_string(),
payload: self.bytes.clone(),
delivery_count: 1,
ack: Box::new(|| {}),
nack: Box::new(|| {}),
};
@@ -216,6 +221,7 @@ mod tests {
let msg = RawMessage {
subject: "bad".to_string(),
payload: b"not valid json".to_vec(),
delivery_count: 1,
ack: Box::new(|| {}),
nack: Box::new(|| {}),
};

View File

@@ -10,6 +10,13 @@ const STREAM_SUBJECT: &str = "thoughts-events.>";
const CONSUMER_NAME: &str = "worker";
const MAX_MESSAGES: i64 = 100_000;
/// Maximum NATS delivery attempts before a message is considered exhausted.
pub const CONSUMER_MAX_DELIVER: i64 = 5;
/// How long NATS waits for an ack before redelivering.
const CONSUMER_ACK_WAIT_SECS: u64 = 30;
/// Timeout for spawned ack/nack async tasks.
const ACK_TASK_TIMEOUT_SECS: u64 = 5;
fn stream_config() -> StreamConfig {
StreamConfig {
name: STREAM_NAME.to_string(),
@@ -121,6 +128,10 @@ impl MessageSource for NatsMessageSource {
CONSUMER_NAME,
jetstream::consumer::pull::Config {
durable_name: Some(CONSUMER_NAME.to_string()),
deliver_policy: jetstream::consumer::DeliverPolicy::New,
ack_policy: jetstream::consumer::AckPolicy::Explicit,
ack_wait: std::time::Duration::from_secs(CONSUMER_ACK_WAIT_SECS),
max_deliver: CONSUMER_MAX_DELIVER,
// No filter_subject — consume everything from the stream.
// filter_subject matching the stream's own wildcard can be
// inconsistent across NATS server versions.
@@ -164,25 +175,48 @@ impl MessageSource for NatsMessageSource {
let subject = msg.subject.to_string();
let payload = msg.payload.to_vec();
let delivery_count = msg
.info()
.map(|info| info.delivered.max(0) as u64)
.unwrap_or(1);
let msg = Arc::new(msg);
let msg_nack = Arc::clone(&msg);
let raw = RawMessage {
subject,
payload,
delivery_count,
ack: Box::new(move || {
let m = Arc::clone(&msg);
tokio::spawn(async move {
if let Err(e) = m.ack().await {
tracing::warn!("NATS ack failed: {e}");
let result = tokio::time::timeout(
std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
m.ack(),
)
.await;
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"),
Err(_) => tracing::warn!(
"NATS ack timed out after {ACK_TASK_TIMEOUT_SECS}s"
),
}
});
}),
nack: Box::new(move || {
let m = Arc::clone(&msg_nack);
tokio::spawn(async move {
if let Err(e) = m.ack_with(AckKind::Nak(None)).await {
tracing::warn!("NATS nak failed: {e}");
let result = tokio::time::timeout(
std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
m.ack_with(AckKind::Nak(None)),
)
.await;
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => tracing::warn!("NATS nack failed: {e}"),
Err(_) => tracing::warn!(
"NATS nack timed out after {ACK_TASK_TIMEOUT_SECS}s"
),
}
});
}),

View File

@@ -7,6 +7,7 @@ edition = "2021"
domain = { workspace = true }
sqlx = { workspace = true }
uuid = { workspace = true }
serde_json = { workspace = true }
chrono = { workspace = true }
async-trait = { workspace = true }
thiserror = { workspace = true }

View File

@@ -0,0 +1,15 @@
CREATE TABLE failed_events (
id UUID NOT NULL DEFAULT gen_random_uuid(),
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
failed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
retry_at TIMESTAMPTZ NOT NULL,
retry_count INT NOT NULL DEFAULT 0,
last_error TEXT NOT NULL,
CONSTRAINT failed_events_pkey PRIMARY KEY (id)
);
CREATE INDEX failed_events_due_idx
ON failed_events (retry_at)
WHERE retry_count < 3;

View File

@@ -0,0 +1,11 @@
-- Change in_reply_to_id FK from RESTRICT (default) to SET NULL.
-- Previously, deleting a thought that had replies raised a FK violation.
-- With SET NULL, deleting a thought orphans its replies (they survive but
-- lose their parent reference), which is the correct semantic for a
-- threaded social app.
ALTER TABLE thoughts
DROP CONSTRAINT IF EXISTS thoughts_in_reply_to_id_fkey;
ALTER TABLE thoughts
ADD CONSTRAINT thoughts_in_reply_to_id_fkey
FOREIGN KEY (in_reply_to_id) REFERENCES thoughts(id) ON DELETE SET NULL;

View File

@@ -0,0 +1,105 @@
use chrono::{DateTime, Utc};
use sqlx::PgPool;
/// How many times a failed event is retried by the DLQ processor.
pub const DLQ_MAX_RETRIES: i32 = 3;
/// Quarantine period for the first DLQ retry (seconds). Doubles each retry.
pub const DLQ_INITIAL_BACKOFF_SECS: i64 = 300; // 5 minutes
/// How often the DLQ processor polls for due retries (seconds).
pub const DLQ_POLL_INTERVAL_SECS: u64 = 60;
#[derive(sqlx::FromRow)]
pub struct FailedEvent {
pub id: uuid::Uuid,
pub event_type: String,
pub payload: serde_json::Value,
pub failed_at: DateTime<Utc>,
pub retry_at: DateTime<Utc>,
pub retry_count: i32,
pub last_error: String,
}
pub struct PgFailedEventStore {
pool: PgPool,
}
impl PgFailedEventStore {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
/// Insert a newly exhausted event into the DLQ.
pub async fn insert(
&self,
event_type: &str,
payload: &serde_json::Value,
last_error: &str,
) -> Result<(), sqlx::Error> {
let retry_at = Utc::now() + chrono::Duration::seconds(DLQ_INITIAL_BACKOFF_SECS);
sqlx::query(
"INSERT INTO failed_events \
(event_type, payload, retry_at, last_error) \
VALUES ($1, $2, $3, $4)",
)
.bind(event_type)
.bind(payload)
.bind(retry_at)
.bind(last_error)
.execute(&self.pool)
.await?;
Ok(())
}
/// Fetch all events due for retry (retry_at <= now, retry_count < DLQ_MAX_RETRIES).
pub async fn poll_due(&self) -> Result<Vec<FailedEvent>, sqlx::Error> {
sqlx::query_as::<_, FailedEvent>(
"SELECT id, event_type, payload, failed_at, retry_at, retry_count, last_error \
FROM failed_events \
WHERE retry_at <= now() AND retry_count < $1 \
ORDER BY retry_at \
LIMIT 100",
)
.bind(DLQ_MAX_RETRIES)
.fetch_all(&self.pool)
.await
}
/// Advance a row after a republish attempt using exponential backoff.
/// next_retry = now + initial * 2^retry_count
pub async fn advance(&self, id: uuid::Uuid, error: Option<&str>) -> Result<(), sqlx::Error> {
let current: i32 =
sqlx::query_scalar("SELECT retry_count FROM failed_events WHERE id = $1")
.bind(id)
.fetch_one(&self.pool)
.await?;
let new_count = current + 1;
let backoff_secs = DLQ_INITIAL_BACKOFF_SECS * (1_i64 << new_count.min(10));
let retry_at = Utc::now() + chrono::Duration::seconds(backoff_secs);
let last_error = error.unwrap_or("republish succeeded");
sqlx::query(
"UPDATE failed_events \
SET retry_count = $1, retry_at = $2, last_error = $3 \
WHERE id = $4",
)
.bind(new_count)
.bind(retry_at)
.bind(last_error)
.bind(id)
.execute(&self.pool)
.await?;
Ok(())
}
/// Park a permanently failed event (retry_count >= DLQ_MAX_RETRIES).
pub async fn park_permanently(&self, id: uuid::Uuid) -> Result<(), sqlx::Error> {
let far_future = Utc::now() + chrono::Duration::days(365);
sqlx::query("UPDATE failed_events SET retry_at = $1 WHERE id = $2")
.bind(far_future)
.bind(id)
.execute(&self.pool)
.await?;
Ok(())
}
}

View File

@@ -3,6 +3,7 @@ pub mod api_key;
pub mod block;
pub mod boost;
mod db_error;
pub mod failed_event;
pub mod feed;
pub mod follow;
pub mod like;

View File

@@ -64,10 +64,15 @@ pub async fn login(
input: LoginInput,
) -> Result<LoginOutput, DomainError> {
let email = Email::new(input.email)?;
let user = users
.find_by_email(&email)
.await?
.ok_or(DomainError::Unauthorized)?;
let user = users.find_by_email(&email).await?;
if user.is_none() {
// Timing equalization — prevents email enumeration via response-time oracle.
// Running the hasher on a miss makes "no such user" take the same time as
// "wrong password", so attackers cannot distinguish the two cases.
let _ = hasher.hash(&input.password).await;
return Err(DomainError::Unauthorized);
}
let user = user.unwrap();
if !hasher.verify(&input.password, &user.password_hash).await? {
return Err(DomainError::Unauthorized);
}

View File

@@ -1,4 +1,5 @@
const JWT_TTL_SECS: i64 = 86_400 * 30;
const JWT_TTL_SECS: i64 = 86_400; // 24 hours (was 30 days)
const JWT_SECRET_MIN_BYTES: usize = 32; // 256 bits minimum for HS256
use async_trait::async_trait;
use sqlx::PgPool;
@@ -107,10 +108,16 @@ pub async fn build(cfg: &Config) -> Infrastructure {
)),
feed: Arc::new(postgres::feed::PgFeedRepository::new(pool.clone())),
search: Arc::new(postgres_search::PgSearchRepository::new(pool.clone())),
auth: Arc::new(auth::JwtAuthService::new(
cfg.jwt_secret.clone(),
JWT_TTL_SECS,
)),
auth: Arc::new({
if cfg.jwt_secret.len() < JWT_SECRET_MIN_BYTES {
panic!(
"JWT_SECRET is {} bytes — minimum is {} bytes for HS256 security",
cfg.jwt_secret.len(),
JWT_SECRET_MIN_BYTES,
);
}
auth::JwtAuthService::new(cfg.jwt_secret.clone(), JWT_TTL_SECS)
}),
hasher: Arc::new(auth::Argon2PasswordHasher),
events: event_publisher,
federation: ap_service.clone() as Arc<dyn domain::ports::FederationActionPort>,

View File

@@ -72,6 +72,7 @@ pub enum DomainEvent {
pub struct EventEnvelope {
pub event: DomainEvent,
pub delivery_count: u64,
pub ack: Box<dyn Fn() + Send + Sync>,
pub nack: Box<dyn Fn() + Send + Sync>,
}
@@ -79,6 +80,7 @@ impl std::fmt::Debug for EventEnvelope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventEnvelope")
.field("event", &self.event)
.field("delivery_count", &self.delivery_count)
.finish()
}
}

View File

@@ -12,6 +12,7 @@ domain = { workspace = true }
application = { workspace = true }
nats = { workspace = true }
event-transport = { workspace = true }
event-payload = { workspace = true }
activitypub-base = { workspace = true }
activitypub = { workspace = true }
postgres = { workspace = true }
@@ -22,6 +23,7 @@ futures = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
dotenvy = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true }
[dev-dependencies]

64
crates/worker/src/dlq.rs Normal file
View File

@@ -0,0 +1,64 @@
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
use postgres::failed_event::{PgFailedEventStore, DLQ_MAX_RETRIES, DLQ_POLL_INTERVAL_SECS};
use std::sync::Arc;
/// Background task: polls `failed_events` and republishes due rows to the event bus.
pub async fn run_dlq_processor(store: Arc<PgFailedEventStore>, publisher: Arc<dyn EventPublisher>) {
let interval = std::time::Duration::from_secs(DLQ_POLL_INTERVAL_SECS);
loop {
tokio::time::sleep(interval).await;
if let Err(e) = process_due(&store, &*publisher).await {
tracing::error!("DLQ processor error: {e}");
}
}
}
async fn process_due(
store: &PgFailedEventStore,
publisher: &dyn EventPublisher,
) -> Result<(), sqlx::Error> {
let due = store.poll_due().await?;
if due.is_empty() {
return Ok(());
}
tracing::info!(count = due.len(), "DLQ: processing due events");
for row in due {
if row.retry_count >= DLQ_MAX_RETRIES {
tracing::error!(
id = %row.id,
event_type = %row.event_type,
retry_count = row.retry_count,
"DLQ: event permanently failed — parking",
);
store.park_permanently(row.id).await?;
continue;
}
let republish_result = republish(&row.payload, publisher).await;
match republish_result {
Ok(()) => {
tracing::info!(id = %row.id, "DLQ: republished successfully");
store.advance(row.id, None).await?;
}
Err(e) => {
tracing::warn!(id = %row.id, error = %e, "DLQ: republish failed");
store.advance(row.id, Some(&e.to_string())).await?;
}
}
}
Ok(())
}
async fn republish(
payload: &serde_json::Value,
publisher: &dyn EventPublisher,
) -> Result<(), DomainError> {
use event_payload::EventPayload;
let ep: EventPayload = serde_json::from_value(payload.clone())
.map_err(|e| DomainError::Internal(format!("DLQ deserialize: {e}")))?;
let event = DomainEvent::try_from(ep)
.map_err(|e| DomainError::Internal(format!("DLQ event conversion: {e}")))?;
publisher.publish(&event).await
}

View File

@@ -1,10 +1,11 @@
use postgres::failed_event::PgFailedEventStore;
use sqlx::PgPool;
use std::sync::Arc;
use activitypub::ThoughtsObjectHandler;
use activitypub_base::ActivityPubService;
use application::services::{FederationEventService, NotificationEventService};
use domain::ports::{ActivityPubRepository, OutboundFederationPort};
use domain::ports::{ActivityPubRepository, EventPublisher, OutboundFederationPort};
use postgres::activitypub::PgActivityPubRepository;
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
@@ -15,14 +16,14 @@ pub struct WorkerHandlers {
pub federation: FederationHandler,
}
pub async fn build(
database_url: &str,
base_url: &str,
nats_url: &str,
) -> (
event_transport::EventConsumerAdapter<nats::NatsMessageSource>,
WorkerHandlers,
) {
pub struct WorkerInfra {
pub consumer: event_transport::EventConsumerAdapter<nats::NatsMessageSource>,
pub handlers: WorkerHandlers,
pub dlq_store: Arc<PgFailedEventStore>,
pub event_publisher: Arc<dyn EventPublisher>,
}
pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> WorkerInfra {
let pool = PgPool::connect(database_url)
.await
.expect("DB connect failed");
@@ -83,15 +84,27 @@ pub async fn build(
},
};
// NATS consumer
// DLQ store
let dlq_store = Arc::new(PgFailedEventStore::new(pool));
// NATS consumer + publisher
let nats_client = async_nats::connect(nats_url)
.await
.expect("NATS connect failed");
nats::ensure_stream(&nats_client)
.await
.expect("JetStream stream setup failed");
let consumer =
event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(nats_client));
let consumer = event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(
nats_client.clone(),
));
let event_publisher: Arc<dyn EventPublisher> = Arc::new(
event_transport::EventPublisherAdapter::new(nats::NatsTransport::new(nats_client)),
);
(consumer, handlers)
WorkerInfra {
consumer,
handlers,
dlq_store,
event_publisher,
}
}

View File

@@ -1,8 +1,10 @@
mod dlq;
mod factory;
mod handlers;
use domain::ports::EventConsumer;
use futures::StreamExt;
use nats::CONSUMER_MAX_DELIVER;
#[tokio::main]
async fn main() {
@@ -16,29 +18,66 @@ async fn main() {
let base_url = std::env::var("BASE_URL").expect("BASE_URL required");
tracing::info!("Building worker...");
let (consumer, handlers) = factory::build(&database_url, &base_url, &nats_url).await;
let infra = factory::build(&database_url, &base_url, &nats_url).await;
// Spawn DLQ processor as a background task.
tokio::spawn(dlq::run_dlq_processor(
infra.dlq_store.clone(),
infra.event_publisher.clone(),
));
tracing::info!("Worker started, consuming events...");
let mut stream = consumer.consume();
let mut stream = infra.consumer.consume();
while let Some(result) = stream.next().await {
match result {
Ok(envelope) => {
let event = &envelope.event;
tracing::debug!(?event, "received event");
let n = handlers.notification.handle(event).await;
let f = handlers.federation.handle(event).await;
let n = infra.handlers.notification.handle(event).await;
let f = infra.handlers.federation.handle(event).await;
if n.is_ok() && f.is_ok() {
(envelope.ack)();
} else {
if let Err(e) = n {
if let Err(e) = &n {
tracing::error!("notification handler: {e}");
}
if let Err(e) = f {
if let Err(e) = &f {
tracing::error!("federation handler: {e}");
}
(envelope.nack)();
// Last delivery attempt -> move to DLQ then ack.
// Earlier attempts -> nack so NATS retries.
if envelope.delivery_count >= CONSUMER_MAX_DELIVER as u64 {
let error_msg = n
.err()
.or(f.err())
.map(|e| e.to_string())
.unwrap_or_else(|| "unknown error".into());
// Serialize event back to payload for storage.
let ep = event_payload::EventPayload::from(event);
let event_type = ep.subject().to_string();
let payload = serde_json::to_value(&ep).unwrap_or(serde_json::Value::Null);
if let Err(e) = infra
.dlq_store
.insert(&event_type, &payload, &error_msg)
.await
{
tracing::error!("DLQ insert failed: {e} — message lost");
} else {
tracing::warn!(
event_type,
delivery_count = envelope.delivery_count,
"event exhausted — moved to DLQ"
);
}
(envelope.ack)(); // ack from NATS — DLQ owns it now
} else {
(envelope.nack)();
}
}
}
Err(e) => tracing::error!("consumer error: {e}"),

View File

@@ -0,0 +1,836 @@
# NATS Hardening, Dead-Letter Queue & Auth Security
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Harden the NATS consumer (explicit config, ack timeouts, unknown-event acking), add an automatic-retry dead-letter queue backed by Postgres, and close three auth security gaps (weak secret validation, timing oracle, excessive JWT TTL).
**Architecture:** Seven sequential tasks. Tasks 13 are independent of each other and of 47 — they can be reviewed in any order. Tasks 47 form a dependency chain: delivery-count metadata (Task 4) → migration (Task 5) → Postgres store (Task 6) → worker DLQ loop (Task 7).
**Tech Stack:** Rust, Tokio, async-nats 0.48, SQLx, Postgres, argon2, jsonwebtoken.
---
### Task 1: Auth hardening — secret validation, timing equalization, TTL
**Files:**
- Modify: `crates/bootstrap/src/factory.rs`
- Modify: `crates/application/src/use_cases/auth.rs`
- Modify: `crates/adapters/auth/src/lib.rs`
- [ ] **Step 1: Replace the magic JWT TTL constant and add secret length check in `factory.rs`**
In `crates/bootstrap/src/factory.rs`, find the current `const JWT_TTL_SECS: i64 = 86_400 * 30;` at the top of the file. Replace it and add a secret minimum constant, then add validation before the `JwtAuthService` is constructed:
```rust
const JWT_TTL_SECS: i64 = 86_400; // 24 hours
const JWT_SECRET_MIN_BYTES: usize = 32; // 256 bits minimum for HS256
```
Then, just before `auth: Arc::new(auth::JwtAuthService::new(...))` in the `build` function, add:
```rust
if cfg.jwt_secret.len() < JWT_SECRET_MIN_BYTES {
panic!(
"JWT_SECRET is {} bytes — minimum is {} bytes for HS256 security",
cfg.jwt_secret.len(),
JWT_SECRET_MIN_BYTES,
);
}
```
- [ ] **Step 2: Update the auth test to use a 32-byte secret**
In `crates/adapters/auth/src/lib.rs`, find the tests that use `"secret".into()`:
```rust
// Before (lines ~98, ~107):
let svc = JwtAuthService::new("secret".into(), 3600);
// After (use 32+ byte secret):
let svc = JwtAuthService::new("a-secret-that-is-at-least-32-bytes".into(), 3600);
```
Update both test cases (`generate_and_validate_token` and `invalid_token_returns_unauthorized`).
- [ ] **Step 3: Add timing equalization to the login use case**
In `crates/application/src/use_cases/auth.rs`, find the `login` function. Currently it early-returns when the user is not found:
```rust
let user = users
.find_by_email(&email)
.await?
.ok_or(DomainError::Unauthorized)?;
```
Replace with a timing-safe version that runs the hasher even when no user is found:
```rust
let user = users.find_by_email(&email).await?;
if user.is_none() {
// Timing equalization — prevents email enumeration via response-time oracle.
// Running the hasher on a miss makes "no such user" take the same time as
// "wrong password", so attackers cannot distinguish the two cases.
let _ = hasher.hash(&input.password).await;
return Err(DomainError::Unauthorized);
}
let user = user.unwrap();
```
- [ ] **Step 4: Compile check**
```bash
cargo check --workspace 2>&1 | head -20
```
Expected: 0 errors.
- [ ] **Step 5: Commit**
```bash
git add crates/bootstrap/src/factory.rs \
crates/application/src/use_cases/auth.rs \
crates/adapters/auth/src/lib.rs
git commit -m "fix(auth): validate JWT secret length, equalize login timing, reduce TTL to 24h"
```
---
### Task 2: NATS consumer hardening — explicit config + ack timeouts
**Files:**
- Modify: `crates/adapters/nats/src/lib.rs`
- [ ] **Step 1: Add named constants**
At the top of `crates/adapters/nats/src/lib.rs`, after the existing constants, add:
```rust
/// Maximum delivery attempts before a message is considered exhausted.
/// The DLQ processor picks it up after this point.
const CONSUMER_MAX_DELIVER: i64 = 5;
/// How long NATS waits for an ack before redelivering.
const CONSUMER_ACK_WAIT_SECS: u64 = 30;
/// Timeout for the spawned ack/nack async task.
const ACK_TASK_TIMEOUT_SECS: u64 = 5;
```
- [ ] **Step 2: Replace the pull consumer config**
Find the `get_or_create_consumer` call (around line 119). Replace `..Default::default()` with explicit settings:
```rust
let consumer = match stream
.get_or_create_consumer(
CONSUMER_NAME,
jetstream::consumer::pull::Config {
durable_name: Some(CONSUMER_NAME.to_string()),
deliver_policy: jetstream::consumer::DeliverPolicy::New,
ack_policy: jetstream::consumer::AckPolicy::Explicit,
ack_wait: std::time::Duration::from_secs(CONSUMER_ACK_WAIT_SECS),
max_deliver: CONSUMER_MAX_DELIVER,
..Default::default()
},
)
.await
```
You will need to add the `DeliverPolicy` and `AckPolicy` imports. Check what's already imported at the top of the file and add if needed:
```rust
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
```
- [ ] **Step 3: Add timeout to the ack task**
Find the `ack:` closure (around line 173). Replace it with a timeout-wrapped version:
```rust
ack: Box::new(move || {
let m = Arc::clone(&msg);
tokio::spawn(async move {
let result = tokio::time::timeout(
std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
m.ack(),
)
.await;
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"),
Err(_) => tracing::warn!("NATS ack timed out after {ACK_TASK_TIMEOUT_SECS}s"),
}
});
}),
```
- [ ] **Step 4: Add timeout to the nack task**
Find the `nack:` closure (around line 181). Same pattern:
```rust
nack: Box::new(move || {
let m = Arc::clone(&msg_nack);
tokio::spawn(async move {
let result = tokio::time::timeout(
std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
m.ack_with(AckKind::Nak(None)),
)
.await;
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => tracing::warn!("NATS nack failed: {e}"),
Err(_) => tracing::warn!("NATS nack timed out after {ACK_TASK_TIMEOUT_SECS}s"),
}
});
}),
```
- [ ] **Step 5: Expose delivery count on RawMessage**
Find where `RawMessage` is constructed (around line 170). Add the NATS message's delivery count before constructing it. Read the NATS message metadata via `msg.info()`:
```rust
let delivery_count = msg
.info()
.map(|info| info.delivered)
.unwrap_or(1) as u64;
let raw = RawMessage {
subject,
payload,
delivery_count,
ack: Box::new(move || { ... }),
nack: Box::new(move || { ... }),
};
```
Note: `msg.info()` returns `Result<Info, _>` where `Info.delivered: u64`. If it's unavailable, default to 1.
- [ ] **Step 6: Compile check**
```bash
cargo check -p nats 2>&1 | head -20
```
Expected: compile errors about `RawMessage` missing `delivery_count` — that's fixed in Task 3.
- [ ] **Step 7: Commit is deferred to after Task 3** (they touch the same type)
---
### Task 3: Add `delivery_count` to `RawMessage` and `EventEnvelope`; ack unknown events
**Files:**
- Modify: `crates/adapters/event-transport/src/lib.rs`
- Modify: `crates/domain/src/events.rs`
- [ ] **Step 1: Add `delivery_count` to `RawMessage`**
In `crates/adapters/event-transport/src/lib.rs`, find the `RawMessage` struct (around line 48). Add the field:
```rust
pub struct RawMessage {
pub subject: String,
pub payload: Vec<u8>,
pub delivery_count: u64, // NEW
pub ack: Box<dyn Fn() + Send + Sync>,
pub nack: Box<dyn Fn() + Send + Sync>,
}
```
- [ ] **Step 2: Add `delivery_count` to `EventEnvelope` in domain**
In `crates/domain/src/events.rs`, find `EventEnvelope` (around line 83). Add the field:
```rust
pub struct EventEnvelope {
pub event: DomainEvent,
pub delivery_count: u64, // NEW
pub ack: Box<dyn Fn() + Send + Sync>,
pub nack: Box<dyn Fn() + Send + Sync>,
}
```
Also update the `Debug` impl (which is manual because closures aren't Debug). Find it and add `delivery_count` to the struct debug output:
```rust
impl std::fmt::Debug for EventEnvelope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventEnvelope")
.field("event", &self.event)
.field("delivery_count", &self.delivery_count)
.finish()
}
}
```
- [ ] **Step 3: Pass `delivery_count` through `EventConsumerAdapter`**
In `crates/adapters/event-transport/src/lib.rs`, find where `EventEnvelope` is constructed in the `consume()` method (around line 97). Update it:
```rust
Some(Ok(EventEnvelope {
event,
delivery_count: msg.delivery_count, // NEW
ack: msg.ack,
nack: msg.nack,
}))
```
- [ ] **Step 4: Ack unknown event types instead of orphaning them**
In the same `consume()` method, find the `Err(e)` arm for unknown event types (around line 92):
```rust
Err(e) => {
tracing::warn!("unknown event type: {e}");
return None;
}
```
Replace with an explicit ack before dropping:
```rust
Err(e) => {
tracing::warn!("unknown or malformed event type — acking to prevent orphan: {e}");
(msg.ack)();
return None;
}
```
Similarly for the deserialization error arm (around line 85):
```rust
Err(e) => {
tracing::warn!("failed to deserialize event payload — acking to prevent orphan: {e}");
(msg.ack)();
return None;
}
```
- [ ] **Step 5: Update test stubs for `RawMessage`**
In the tests inside `event-transport/src/lib.rs`, `RawMessage` is constructed with `ack` and `nack`. Add `delivery_count: 1` to each:
```rust
let msg = RawMessage {
subject: "thoughts.created".to_string(),
payload: self.bytes.clone(),
delivery_count: 1,
ack: Box::new(|| {}),
nack: Box::new(|| {}),
};
```
Find all `RawMessage { ... }` constructions in the test module and add `delivery_count: 1`.
- [ ] **Step 6: Full workspace compile check**
```bash
cargo check --workspace 2>&1 | head -40
```
Fix any remaining construction sites for `RawMessage` or `EventEnvelope` that are missing `delivery_count`.
- [ ] **Step 7: Commit Tasks 2 and 3 together**
```bash
git add crates/adapters/nats/src/lib.rs \
crates/adapters/event-transport/src/lib.rs \
crates/domain/src/events.rs
git commit -m "fix(nats): explicit consumer config, ack timeouts, unknown-event acking, delivery_count"
```
---
### Task 4: `failed_events` migration
**Files:**
- Create: `crates/adapters/postgres/migrations/009_failed_events.sql`
- [ ] **Step 1: Create the migration file**
Create `crates/adapters/postgres/migrations/009_failed_events.sql`:
```sql
CREATE TABLE failed_events (
id UUID NOT NULL DEFAULT gen_random_uuid(),
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
failed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
retry_at TIMESTAMPTZ NOT NULL,
retry_count INT NOT NULL DEFAULT 0,
last_error TEXT NOT NULL,
CONSTRAINT failed_events_pkey PRIMARY KEY (id)
);
-- Partial index: only rows actively due for retry are in this index.
CREATE INDEX failed_events_due_idx
ON failed_events (retry_at)
WHERE retry_count < 3;
```
- [ ] **Step 2: Verify the migration file is syntactically correct**
```bash
cargo check -p postgres 2>&1 | head -10
```
(The postgres crate auto-discovers migrations via `sqlx::migrate!` — the file just needs to exist with valid SQL. Syntax is validated at runtime in integration tests.)
- [ ] **Step 3: Commit**
```bash
git add crates/adapters/postgres/migrations/009_failed_events.sql
git commit -m "feat(db): add failed_events table for dead-letter queue"
```
---
### Task 5: `PgFailedEventStore` — Postgres DLQ repository
**Files:**
- Create: `crates/adapters/postgres/src/failed_event.rs`
- Modify: `crates/adapters/postgres/src/lib.rs`
- [ ] **Step 1: Create `failed_event.rs`**
Create `crates/adapters/postgres/src/failed_event.rs`:
```rust
use crate::db_error::IntoDbResult;
use chrono::{DateTime, Utc};
use sqlx::PgPool;
/// How many times a failed event is retried by the DLQ processor.
pub const DLQ_MAX_RETRIES: i32 = 3;
/// Quarantine period for the first DLQ retry (seconds). Doubles each retry.
pub const DLQ_INITIAL_BACKOFF_SECS: i64 = 300; // 5 minutes
/// How often the DLQ processor polls for due retries (seconds).
pub const DLQ_POLL_INTERVAL_SECS: u64 = 60;
#[derive(sqlx::FromRow)]
pub struct FailedEvent {
pub id: uuid::Uuid,
pub event_type: String,
pub payload: serde_json::Value,
pub failed_at: DateTime<Utc>,
pub retry_at: DateTime<Utc>,
pub retry_count: i32,
pub last_error: String,
}
pub struct PgFailedEventStore {
pool: PgPool,
}
impl PgFailedEventStore {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
/// Insert a newly exhausted event into the DLQ.
pub async fn insert(
&self,
event_type: &str,
payload: &serde_json::Value,
last_error: &str,
) -> Result<(), sqlx::Error> {
let retry_at = Utc::now()
+ chrono::Duration::seconds(DLQ_INITIAL_BACKOFF_SECS);
sqlx::query(
"INSERT INTO failed_events \
(event_type, payload, retry_at, last_error) \
VALUES ($1, $2, $3, $4)",
)
.bind(event_type)
.bind(payload)
.bind(retry_at)
.bind(last_error)
.execute(&self.pool)
.await?;
Ok(())
}
/// Fetch all events due for retry (retry_at <= now, retry_count < DLQ_MAX_RETRIES).
pub async fn poll_due(&self) -> Result<Vec<FailedEvent>, sqlx::Error> {
sqlx::query_as::<_, FailedEvent>(
"SELECT id, event_type, payload, failed_at, retry_at, retry_count, last_error \
FROM failed_events \
WHERE retry_at <= now() AND retry_count < $1 \
ORDER BY retry_at \
LIMIT 100",
)
.bind(DLQ_MAX_RETRIES)
.fetch_all(&self.pool)
.await
}
/// Advance a row after a republish attempt.
/// Uses exponential backoff: next_retry = now + initial * 2^retry_count.
pub async fn advance(
&self,
id: uuid::Uuid,
error: Option<&str>,
) -> Result<(), sqlx::Error> {
// Fetch current retry_count to compute backoff.
let current: i32 = sqlx::query_scalar(
"SELECT retry_count FROM failed_events WHERE id = $1",
)
.bind(id)
.fetch_one(&self.pool)
.await?;
let new_count = current + 1;
let backoff_secs = DLQ_INITIAL_BACKOFF_SECS * (1_i64 << new_count.min(10));
let retry_at = Utc::now() + chrono::Duration::seconds(backoff_secs);
let last_error = error.unwrap_or("republish succeeded");
sqlx::query(
"UPDATE failed_events \
SET retry_count = $1, retry_at = $2, last_error = $3 \
WHERE id = $4",
)
.bind(new_count)
.bind(retry_at)
.bind(last_error)
.bind(id)
.execute(&self.pool)
.await?;
Ok(())
}
/// Park a permanently failed event (retry_count >= DLQ_MAX_RETRIES).
/// Sets retry_at 1 year out so it falls out of the active index.
pub async fn park_permanently(&self, id: uuid::Uuid) -> Result<(), sqlx::Error> {
let far_future = Utc::now() + chrono::Duration::days(365);
sqlx::query(
"UPDATE failed_events SET retry_at = $1 WHERE id = $2",
)
.bind(far_future)
.bind(id)
.execute(&self.pool)
.await?;
Ok(())
}
}
```
- [ ] **Step 2: Export from `postgres/src/lib.rs`**
In `crates/adapters/postgres/src/lib.rs`, add:
```rust
pub mod failed_event;
```
- [ ] **Step 3: Compile check**
```bash
cargo check -p postgres 2>&1 | head -20
```
Expected: 0 errors.
- [ ] **Step 4: Commit**
```bash
git add crates/adapters/postgres/src/failed_event.rs \
crates/adapters/postgres/src/lib.rs
git commit -m "feat(postgres): PgFailedEventStore for dead-letter queue"
```
---
### Task 6: DLQ processor in worker
**Files:**
- Create: `crates/worker/src/dlq.rs`
- Modify: `crates/worker/src/factory.rs`
- Modify: `crates/worker/src/main.rs`
The `delivery_count` on `EventEnvelope` (added in Task 3) tells the worker when a message is on its last attempt. The main loop inserts to the DLQ when handlers fail at `delivery_count >= CONSUMER_MAX_DELIVER`. A separate background task polls the DLQ and republishes due events.
- [ ] **Step 1: Create `dlq.rs`**
Create `crates/worker/src/dlq.rs`:
```rust
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
use postgres::failed_event::{
DLQ_MAX_RETRIES, DLQ_POLL_INTERVAL_SECS, PgFailedEventStore,
};
use std::sync::Arc;
/// Background task: polls `failed_events` and republishes due rows to the event bus.
/// Runs on a fixed interval for the lifetime of the worker.
pub async fn run_dlq_processor(
store: Arc<PgFailedEventStore>,
publisher: Arc<dyn EventPublisher>,
) {
let interval = std::time::Duration::from_secs(DLQ_POLL_INTERVAL_SECS);
loop {
tokio::time::sleep(interval).await;
if let Err(e) = process_due(&store, &*publisher).await {
tracing::error!("DLQ processor error: {e}");
}
}
}
async fn process_due(
store: &PgFailedEventStore,
publisher: &dyn EventPublisher,
) -> Result<(), sqlx::Error> {
let due = store.poll_due().await?;
if due.is_empty() {
return Ok(());
}
tracing::info!(count = due.len(), "DLQ: processing due events");
for row in due {
if row.retry_count >= DLQ_MAX_RETRIES {
tracing::error!(
id = %row.id,
event_type = %row.event_type,
retry_count = row.retry_count,
"DLQ: event permanently failed — parking",
);
store.park_permanently(row.id).await?;
continue;
}
// Attempt to republish the raw payload as a domain event.
let republish_result = republish(&row.payload, publisher).await;
match republish_result {
Ok(()) => {
tracing::info!(id = %row.id, "DLQ: republished successfully");
store.advance(row.id, None).await?;
}
Err(e) => {
tracing::warn!(id = %row.id, error = %e, "DLQ: republish failed");
store.advance(row.id, Some(&e.to_string())).await?;
}
}
}
Ok(())
}
async fn republish(
payload: &serde_json::Value,
publisher: &dyn EventPublisher,
) -> Result<(), DomainError> {
use event_payload::EventPayload;
let ep: EventPayload = serde_json::from_value(payload.clone())
.map_err(|e| DomainError::Internal(format!("DLQ deserialize: {e}")))?;
let event = DomainEvent::try_from(ep)
.map_err(|e| DomainError::Internal(format!("DLQ event conversion: {e}")))?;
publisher.publish(&event).await
}
```
- [ ] **Step 2: Add `PgFailedEventStore` to `factory.rs`**
In `crates/worker/src/factory.rs`, add to the imports:
```rust
use postgres::failed_event::PgFailedEventStore;
use std::sync::Arc;
```
Then return it from `build` alongside the consumer and handlers. Change the return type to a new struct:
```rust
pub struct WorkerInfra {
pub consumer: event_transport::EventConsumerAdapter<nats::NatsMessageSource>,
pub handlers: WorkerHandlers,
pub dlq_store: Arc<PgFailedEventStore>,
pub event_publisher: Arc<dyn domain::ports::EventPublisher>,
}
```
At the end of `build`, construct and return:
```rust
let dlq_store = Arc::new(PgFailedEventStore::new(pool.clone()));
// ... existing consumer construction ...
WorkerInfra {
consumer,
handlers,
dlq_store,
event_publisher, // the NATS publisher already constructed in factory
}
```
Note: the factory currently doesn't return the event publisher. Add an `event_publisher` field to `WorkerInfra` and thread the existing `Arc<dyn EventPublisher>` through (it's used for the ActivityPub handler — reuse the same instance).
Read the existing `factory.rs` to see how the NATS publisher is currently constructed and reuse it for both the ActivityPub handler and the returned publisher.
- [ ] **Step 3: Update `main.rs` to use the DLQ**
In `crates/worker/src/main.rs`, update to use the new `WorkerInfra`:
```rust
mod dlq;
mod factory;
mod handlers;
use domain::ports::EventConsumer;
use futures::StreamExt;
use nats::CONSUMER_MAX_DELIVER;
#[tokio::main]
async fn main() {
dotenvy::dotenv().ok();
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required");
let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());
let base_url = std::env::var("BASE_URL").expect("BASE_URL required");
tracing::info!("Building worker...");
let infra = factory::build(&database_url, &base_url, &nats_url).await;
// Spawn DLQ processor as a background task.
tokio::spawn(dlq::run_dlq_processor(
infra.dlq_store.clone(),
infra.event_publisher.clone(),
));
tracing::info!("Worker started, consuming events...");
let mut stream = infra.consumer.consume();
while let Some(result) = stream.next().await {
match result {
Ok(envelope) => {
let event = &envelope.event;
tracing::debug!(?event, "received event");
let n = infra.handlers.notification.handle(event).await;
let f = infra.handlers.federation.handle(event).await;
if n.is_ok() && f.is_ok() {
(envelope.ack)();
} else {
// Log errors.
if let Err(e) = &n {
tracing::error!("notification handler: {e}");
}
if let Err(e) = &f {
tracing::error!("federation handler: {e}");
}
// On last delivery attempt — insert to DLQ then ack.
// On earlier attempts — nack so NATS retries.
if envelope.delivery_count >= CONSUMER_MAX_DELIVER as u64 {
let error_msg = n
.err()
.or(f.err())
.map(|e| e.to_string())
.unwrap_or_else(|| "unknown error".into());
let payload = serde_json::to_value(&event_payload::EventPayload::from(event))
.unwrap_or(serde_json::Value::Null);
let event_type = format!("{:?}", event)
.split_whitespace()
.next()
.unwrap_or("Unknown")
.to_string();
if let Err(e) = infra
.dlq_store
.insert(&event_type, &payload, &error_msg)
.await
{
tracing::error!("DLQ insert failed: {e} — message lost");
} else {
tracing::warn!(
event_type,
delivery_count = envelope.delivery_count,
"event exhausted — moved to DLQ"
);
}
(envelope.ack)(); // ack from NATS — DLQ owns it now
} else {
(envelope.nack)(); // let NATS retry
}
}
}
Err(e) => tracing::error!("consumer error: {e}"),
}
}
}
```
Note: `CONSUMER_MAX_DELIVER` must be exported from `crates/adapters/nats/src/lib.rs`. Add `pub` to that constant in Task 2.
- [ ] **Step 4: Export `CONSUMER_MAX_DELIVER` from nats crate**
In `crates/adapters/nats/src/lib.rs`, change the constant to `pub`:
```rust
pub const CONSUMER_MAX_DELIVER: i64 = 5;
```
- [ ] **Step 5: Full workspace compile check**
```bash
cargo check --workspace 2>&1 | head -40
```
Fix all errors. Common issues:
- Missing imports in `main.rs` for `event_payload`
- `event` variable lifetime in the DLQ insert block — may need to clone `event`
- `WorkerInfra` construction in `factory.rs` missing fields
- [ ] **Step 6: Verify tests still pass**
```bash
cargo test --workspace 2>&1 | tail -5
```
Expected: all tests pass.
- [ ] **Step 7: Commit**
```bash
git add crates/worker/src/dlq.rs \
crates/worker/src/factory.rs \
crates/worker/src/main.rs \
crates/adapters/nats/src/lib.rs
git commit -m "feat(worker): DLQ processor — exhausted events moved to failed_events with exponential retry"
```
---
## Self-Review
**Spec coverage:**
| Spec requirement | Task |
|---|---|
| `CONSUMER_MAX_DELIVER = 5` constant | Task 2 |
| `CONSUMER_ACK_WAIT_SECS = 30` constant | Task 2 |
| `ACK_TASK_TIMEOUT_SECS = 5` constant | Task 2 |
| Explicit consumer config (deliver_policy, ack_policy, ack_wait, max_deliver) | Task 2 |
| Ack task with timeout | Task 2 |
| Nack task with timeout | Task 2 |
| Unknown event types acked before drop | Task 3 |
| `delivery_count` threaded through to worker | Tasks 2, 3 |
| `failed_events` migration | Task 4 |
| `PgFailedEventStore` (insert, poll_due, advance, park_permanently) | Task 5 |
| DLQ processor (poll interval, exponential backoff, park permanently) | Task 6 |
| Worker inserts to DLQ at `delivery_count >= max_deliver` | Task 6 |
| `JWT_SECRET_MIN_BYTES = 32` constant | Task 1 |
| Panic on weak secret at startup | Task 1 |
| `JWT_TTL_SECS = 86_400` (24h) | Task 1 |
| Timing equalization on failed login | Task 1 |
**No placeholders found.**
**Type consistency:** `CONSUMER_MAX_DELIVER: i64` in Task 2; cast to `u64` for comparison with `envelope.delivery_count: u64` in Task 6 (`>= CONSUMER_MAX_DELIVER as u64`). Consistent. `DLQ_MAX_RETRIES: i32` matches `retry_count: i32` in `FailedEvent`. `DLQ_INITIAL_BACKOFF_SECS: i64` used with `chrono::Duration::seconds(i64)`. All consistent.

View File

@@ -0,0 +1,193 @@
# NATS Hardening, Dead-Letter Queue & Auth Security Design
**Goal:** Fix five reliability issues in the NATS adapter, introduce an automatic-retry dead-letter queue backed by Postgres, and close three auth security gaps.
---
## Section 1: NATS Consumer Hardening
### Consumer configuration
All magic numbers become named constants in `crates/adapters/nats/src/lib.rs`:
```rust
const CONSUMER_MAX_DELIVER: i64 = 5;
const CONSUMER_ACK_WAIT_SECS: u64 = 30;
const ACK_TASK_TIMEOUT_SECS: u64 = 5;
```
The pull consumer config changes from `..Default::default()` to explicit settings:
```rust
pull::Config {
durable_name: Some(CONSUMER_NAME.to_string()),
deliver_policy: DeliverPolicy::New,
ack_policy: AckPolicy::Explicit,
ack_wait: Duration::from_secs(CONSUMER_ACK_WAIT_SECS),
max_deliver: CONSUMER_MAX_DELIVER,
..Default::default()
}
```
- `DeliverPolicy::New` — worker restarts from the current position, not from the beginning of the stream
- `AckPolicy::Explicit` — explicit (already the default, but now documented)
- `ack_wait` — if the worker hangs for 30s without acking, NATS redelivers
- `max_deliver` — after 5 failed deliveries the message is exhausted; the DLQ picks it up
### Ack task timeout
Spawned ack/nack tasks currently have no timeout. If NATS is stuck, they hang forever. Wrap with `tokio::time::timeout`:
```rust
ack: Box::new(move || {
let m = Arc::clone(&msg);
tokio::spawn(async move {
let result = tokio::time::timeout(
Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
m.ack(),
).await;
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"),
Err(_) => tracing::warn!("NATS ack timed out"),
}
});
}),
```
Same pattern for nack.
### Unknown event type acking
Currently unknown event types are silently dropped via `filter_map` and never acked — they orphan in the stream until `max_deliver` is exceeded. Fix: ack unknown messages explicitly before discarding:
In `event-transport/src/lib.rs`, when deserialization fails, ack the raw NATS message before returning `None`:
```rust
Err(e) => {
tracing::warn!("unknown or malformed event, acking to prevent orphan: {e}");
// ack the message so it doesn't loop forever
msg.ack();
None
}
```
---
## Section 2: Dead-Letter Queue
### Schema
New migration `009_failed_events.sql`:
```sql
CREATE TABLE failed_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
failed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
retry_at TIMESTAMPTZ NOT NULL,
retry_count INT NOT NULL DEFAULT 0,
last_error TEXT NOT NULL
);
CREATE INDEX failed_events_retry_at_idx ON failed_events (retry_at)
WHERE retry_count < 3;
```
### Constants
In `crates/adapters/nats/src/lib.rs` (or a new `dlq.rs` module):
```rust
const DLQ_INITIAL_BACKOFF_SECS: u64 = 300; // 5 minutes
const DLQ_MAX_RETRIES: i32 = 3;
const DLQ_POLL_INTERVAL_SECS: u64 = 60; // check every minute
```
### Worker flow
**On exhausted message** (detected when `num_delivered >= CONSUMER_MAX_DELIVER`):
1. Worker inserts row to `failed_events` with `retry_at = now() + DLQ_INITIAL_BACKOFF_SECS`
2. Worker **acks** the NATS message (removes it from the stream)
3. Message will be retried by the DLQ processor, not by NATS
**DlqProcessor** — runs in the worker on a `DLQ_POLL_INTERVAL_SECS` tick:
1. Query: `SELECT * FROM failed_events WHERE retry_at <= now() AND retry_count < DLQ_MAX_RETRIES`
2. For each row: republish the `payload` JSONB to the NATS `thoughts-events` main stream
3. Update row: `retry_count += 1`, `retry_at = now() + DLQ_INITIAL_BACKOFF_SECS * 2^retry_count` (exponential backoff: 5m, 10m, 20m)
4. If republish fails: update `last_error`, leave row for next poll
5. When the republished message is processed successfully by the worker, the event handler completes normally — the `failed_events` row is deleted on success (see below)
**On DLQ retry success detection**: After republishing, the DLQ processor subscribes to the ack signal OR the processor marks rows as `retry_count = DLQ_MAX_RETRIES` optimistically and lets the event handler delete the row if the event type matches. Simpler: the DLQ row is deleted when `retry_count` reaches the threshold and the message is republished for the final time. If the final attempt also fails, it stays in the table as a permanently failed record with `retry_count = DLQ_MAX_RETRIES` for manual inspection.
### Permanently failed messages
Rows with `retry_count >= DLQ_MAX_RETRIES AND retry_at <= now()` are permanently failed. The DLQ processor:
- Logs them at `ERROR` level with full payload
- Sets `retry_at = now() + 365 days` (parking them out of the active query range)
- Does NOT delete them — they remain visible for manual inspection
A future admin endpoint can query and replay them, but that is out of scope for this spec.
---
## Section 3: Auth Hardening
### JWT secret validation
In `crates/bootstrap/src/factory.rs`, before constructing `JwtAuthService`:
```rust
const JWT_SECRET_MIN_BYTES: usize = 32;
if cfg.jwt_secret.len() < JWT_SECRET_MIN_BYTES {
panic!(
"JWT_SECRET is too short ({} bytes). \
Minimum is {} bytes for HS256 security.",
cfg.jwt_secret.len(),
JWT_SECRET_MIN_BYTES
);
}
```
Startup panics are appropriate here — running with a weak secret is a security failure.
### Timing equalization on failed login
In `crates/application/src/use_cases/auth.rs`, in the `login` function, when the user is not found by email:
```rust
fn dummy_hash() -> argon2::PasswordHash<'static> {
// Pre-computed Argon2 hash of empty string. Used only to equalize timing
// on failed lookups so attackers cannot enumerate valid emails.
argon2::PasswordHash::new(
"$argon2id$v=19$m=19456,t=2,p=1$\
AAAAAAAAAAAAAAAAAAAAAA$\
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
).expect("dummy hash is valid")
}
// In login():
if user.is_none() {
let _ = Argon2::default().verify_password(plain.as_bytes(), &dummy_hash());
return Err(DomainError::Unauthorized);
}
```
### JWT TTL reduction
In `crates/bootstrap/src/factory.rs`, the existing `JWT_TTL_SECS` constant:
```rust
const JWT_TTL_SECS: i64 = 86_400; // 24 hours (was 30 days)
```
---
## What does NOT change
- NATS subject naming (`thoughts-events.>`) — unchanged
- `MAX_MESSAGES` stream limit (100k) — unchanged; monitoring is out of scope
- API surface, domain events, application layer — unchanged
- Auth extractor, claims structure (`sub`, `exp`) — unchanged