From c092b9e65878f20604e754b67425d88fc8e1481f Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 15 May 2026 16:23:21 +0200 Subject: [PATCH] feat(postgres): failed_events table and PgFailedEventStore for dead-letter queue --- crates/adapters/postgres/Cargo.toml | 1 + .../postgres/migrations/009_failed_events.sql | 15 +++ crates/adapters/postgres/src/failed_event.rs | 105 ++++++++++++++++++ crates/adapters/postgres/src/lib.rs | 1 + 4 files changed, 122 insertions(+) create mode 100644 crates/adapters/postgres/migrations/009_failed_events.sql create mode 100644 crates/adapters/postgres/src/failed_event.rs diff --git a/crates/adapters/postgres/Cargo.toml b/crates/adapters/postgres/Cargo.toml index 0e20014..f30f243 100644 --- a/crates/adapters/postgres/Cargo.toml +++ b/crates/adapters/postgres/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" domain = { workspace = true } sqlx = { workspace = true } uuid = { workspace = true } +serde_json = { workspace = true } chrono = { workspace = true } async-trait = { workspace = true } thiserror = { workspace = true } diff --git a/crates/adapters/postgres/migrations/009_failed_events.sql b/crates/adapters/postgres/migrations/009_failed_events.sql new file mode 100644 index 0000000..4a558a6 --- /dev/null +++ b/crates/adapters/postgres/migrations/009_failed_events.sql @@ -0,0 +1,15 @@ +CREATE TABLE failed_events ( + id UUID NOT NULL DEFAULT gen_random_uuid(), + event_type TEXT NOT NULL, + payload JSONB NOT NULL, + failed_at TIMESTAMPTZ NOT NULL DEFAULT now(), + retry_at TIMESTAMPTZ NOT NULL, + retry_count INT NOT NULL DEFAULT 0, + last_error TEXT NOT NULL, + + CONSTRAINT failed_events_pkey PRIMARY KEY (id) +); + +CREATE INDEX failed_events_due_idx + ON failed_events (retry_at) + WHERE retry_count < 3; diff --git a/crates/adapters/postgres/src/failed_event.rs b/crates/adapters/postgres/src/failed_event.rs new file mode 100644 index 0000000..9aa79fb --- /dev/null +++ b/crates/adapters/postgres/src/failed_event.rs @@ -0,0 +1,105 @@ +use chrono::{DateTime, Utc}; +use sqlx::PgPool; + +/// How many times a failed event is retried by the DLQ processor. +pub const DLQ_MAX_RETRIES: i32 = 3; +/// Quarantine period for the first DLQ retry (seconds). Doubles each retry. +pub const DLQ_INITIAL_BACKOFF_SECS: i64 = 300; // 5 minutes +/// How often the DLQ processor polls for due retries (seconds). +pub const DLQ_POLL_INTERVAL_SECS: u64 = 60; + +#[derive(sqlx::FromRow)] +pub struct FailedEvent { + pub id: uuid::Uuid, + pub event_type: String, + pub payload: serde_json::Value, + pub failed_at: DateTime, + pub retry_at: DateTime, + pub retry_count: i32, + pub last_error: String, +} + +pub struct PgFailedEventStore { + pool: PgPool, +} + +impl PgFailedEventStore { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + /// Insert a newly exhausted event into the DLQ. + pub async fn insert( + &self, + event_type: &str, + payload: &serde_json::Value, + last_error: &str, + ) -> Result<(), sqlx::Error> { + let retry_at = Utc::now() + chrono::Duration::seconds(DLQ_INITIAL_BACKOFF_SECS); + sqlx::query( + "INSERT INTO failed_events \ + (event_type, payload, retry_at, last_error) \ + VALUES ($1, $2, $3, $4)", + ) + .bind(event_type) + .bind(payload) + .bind(retry_at) + .bind(last_error) + .execute(&self.pool) + .await?; + Ok(()) + } + + /// Fetch all events due for retry (retry_at <= now, retry_count < DLQ_MAX_RETRIES). + pub async fn poll_due(&self) -> Result, sqlx::Error> { + sqlx::query_as::<_, FailedEvent>( + "SELECT id, event_type, payload, failed_at, retry_at, retry_count, last_error \ + FROM failed_events \ + WHERE retry_at <= now() AND retry_count < $1 \ + ORDER BY retry_at \ + LIMIT 100", + ) + .bind(DLQ_MAX_RETRIES) + .fetch_all(&self.pool) + .await + } + + /// Advance a row after a republish attempt using exponential backoff. + /// next_retry = now + initial * 2^retry_count + pub async fn advance(&self, id: uuid::Uuid, error: Option<&str>) -> Result<(), sqlx::Error> { + let current: i32 = + sqlx::query_scalar("SELECT retry_count FROM failed_events WHERE id = $1") + .bind(id) + .fetch_one(&self.pool) + .await?; + + let new_count = current + 1; + let backoff_secs = DLQ_INITIAL_BACKOFF_SECS * (1_i64 << new_count.min(10)); + let retry_at = Utc::now() + chrono::Duration::seconds(backoff_secs); + let last_error = error.unwrap_or("republish succeeded"); + + sqlx::query( + "UPDATE failed_events \ + SET retry_count = $1, retry_at = $2, last_error = $3 \ + WHERE id = $4", + ) + .bind(new_count) + .bind(retry_at) + .bind(last_error) + .bind(id) + .execute(&self.pool) + .await?; + Ok(()) + } + + /// Park a permanently failed event (retry_count >= DLQ_MAX_RETRIES). + pub async fn park_permanently(&self, id: uuid::Uuid) -> Result<(), sqlx::Error> { + let far_future = Utc::now() + chrono::Duration::days(365); + sqlx::query("UPDATE failed_events SET retry_at = $1 WHERE id = $2") + .bind(far_future) + .bind(id) + .execute(&self.pool) + .await?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index d22cd34..29e2f1c 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -3,6 +3,7 @@ pub mod api_key; pub mod block; pub mod boost; mod db_error; +pub mod failed_event; pub mod feed; pub mod follow; pub mod like;