From e43d784c397eae9e661f48378b501c2e0f646a39 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 15 May 2026 16:26:44 +0200 Subject: [PATCH] =?UTF-8?q?feat(worker):=20DLQ=20processor=20=E2=80=94=20e?= =?UTF-8?q?xhausted=20events=20moved=20to=20failed=5Fevents=20with=20expon?= =?UTF-8?q?ential=20retry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/worker/Cargo.toml | 2 ++ crates/worker/src/dlq.rs | 64 ++++++++++++++++++++++++++++++++++++ crates/worker/src/factory.rs | 39 ++++++++++++++-------- crates/worker/src/main.rs | 53 +++++++++++++++++++++++++---- 4 files changed, 138 insertions(+), 20 deletions(-) create mode 100644 crates/worker/src/dlq.rs diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 8a22dd5..54fa65a 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -12,6 +12,7 @@ domain = { workspace = true } application = { workspace = true } nats = { workspace = true } event-transport = { workspace = true } +event-payload = { workspace = true } activitypub-base = { workspace = true } activitypub = { workspace = true } postgres = { workspace = true } @@ -22,6 +23,7 @@ futures = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } dotenvy = { workspace = true } +serde_json = { workspace = true } sqlx = { workspace = true } [dev-dependencies] diff --git a/crates/worker/src/dlq.rs b/crates/worker/src/dlq.rs new file mode 100644 index 0000000..4f66fb8 --- /dev/null +++ b/crates/worker/src/dlq.rs @@ -0,0 +1,64 @@ +use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; +use postgres::failed_event::{PgFailedEventStore, DLQ_MAX_RETRIES, DLQ_POLL_INTERVAL_SECS}; +use std::sync::Arc; + +/// Background task: polls `failed_events` and republishes due rows to the event bus. +pub async fn run_dlq_processor(store: Arc, publisher: Arc) { + let interval = std::time::Duration::from_secs(DLQ_POLL_INTERVAL_SECS); + loop { + tokio::time::sleep(interval).await; + if let Err(e) = process_due(&store, &*publisher).await { + tracing::error!("DLQ processor error: {e}"); + } + } +} + +async fn process_due( + store: &PgFailedEventStore, + publisher: &dyn EventPublisher, +) -> Result<(), sqlx::Error> { + let due = store.poll_due().await?; + if due.is_empty() { + return Ok(()); + } + tracing::info!(count = due.len(), "DLQ: processing due events"); + + for row in due { + if row.retry_count >= DLQ_MAX_RETRIES { + tracing::error!( + id = %row.id, + event_type = %row.event_type, + retry_count = row.retry_count, + "DLQ: event permanently failed — parking", + ); + store.park_permanently(row.id).await?; + continue; + } + + let republish_result = republish(&row.payload, publisher).await; + + match republish_result { + Ok(()) => { + tracing::info!(id = %row.id, "DLQ: republished successfully"); + store.advance(row.id, None).await?; + } + Err(e) => { + tracing::warn!(id = %row.id, error = %e, "DLQ: republish failed"); + store.advance(row.id, Some(&e.to_string())).await?; + } + } + } + Ok(()) +} + +async fn republish( + payload: &serde_json::Value, + publisher: &dyn EventPublisher, +) -> Result<(), DomainError> { + use event_payload::EventPayload; + let ep: EventPayload = serde_json::from_value(payload.clone()) + .map_err(|e| DomainError::Internal(format!("DLQ deserialize: {e}")))?; + let event = DomainEvent::try_from(ep) + .map_err(|e| DomainError::Internal(format!("DLQ event conversion: {e}")))?; + publisher.publish(&event).await +} diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs index 49cac74..12da308 100644 --- a/crates/worker/src/factory.rs +++ b/crates/worker/src/factory.rs @@ -1,10 +1,11 @@ +use postgres::failed_event::PgFailedEventStore; use sqlx::PgPool; use std::sync::Arc; use activitypub::ThoughtsObjectHandler; use activitypub_base::ActivityPubService; use application::services::{FederationEventService, NotificationEventService}; -use domain::ports::{ActivityPubRepository, OutboundFederationPort}; +use domain::ports::{ActivityPubRepository, EventPublisher, OutboundFederationPort}; use postgres::activitypub::PgActivityPubRepository; use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; @@ -15,14 +16,14 @@ pub struct WorkerHandlers { pub federation: FederationHandler, } -pub async fn build( - database_url: &str, - base_url: &str, - nats_url: &str, -) -> ( - event_transport::EventConsumerAdapter, - WorkerHandlers, -) { +pub struct WorkerInfra { + pub consumer: event_transport::EventConsumerAdapter, + pub handlers: WorkerHandlers, + pub dlq_store: Arc, + pub event_publisher: Arc, +} + +pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> WorkerInfra { let pool = PgPool::connect(database_url) .await .expect("DB connect failed"); @@ -83,15 +84,27 @@ pub async fn build( }, }; - // NATS consumer + // DLQ store + let dlq_store = Arc::new(PgFailedEventStore::new(pool)); + + // NATS consumer + publisher let nats_client = async_nats::connect(nats_url) .await .expect("NATS connect failed"); nats::ensure_stream(&nats_client) .await .expect("JetStream stream setup failed"); - let consumer = - event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(nats_client)); + let consumer = event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new( + nats_client.clone(), + )); + let event_publisher: Arc = Arc::new( + event_transport::EventPublisherAdapter::new(nats::NatsTransport::new(nats_client)), + ); - (consumer, handlers) + WorkerInfra { + consumer, + handlers, + dlq_store, + event_publisher, + } } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 8a699fb..febe52f 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -1,8 +1,10 @@ +mod dlq; mod factory; mod handlers; use domain::ports::EventConsumer; use futures::StreamExt; +use nats::CONSUMER_MAX_DELIVER; #[tokio::main] async fn main() { @@ -16,29 +18,66 @@ async fn main() { let base_url = std::env::var("BASE_URL").expect("BASE_URL required"); tracing::info!("Building worker..."); - let (consumer, handlers) = factory::build(&database_url, &base_url, &nats_url).await; + let infra = factory::build(&database_url, &base_url, &nats_url).await; + + // Spawn DLQ processor as a background task. + tokio::spawn(dlq::run_dlq_processor( + infra.dlq_store.clone(), + infra.event_publisher.clone(), + )); tracing::info!("Worker started, consuming events..."); - let mut stream = consumer.consume(); + let mut stream = infra.consumer.consume(); while let Some(result) = stream.next().await { match result { Ok(envelope) => { let event = &envelope.event; tracing::debug!(?event, "received event"); - let n = handlers.notification.handle(event).await; - let f = handlers.federation.handle(event).await; + let n = infra.handlers.notification.handle(event).await; + let f = infra.handlers.federation.handle(event).await; if n.is_ok() && f.is_ok() { (envelope.ack)(); } else { - if let Err(e) = n { + if let Err(e) = &n { tracing::error!("notification handler: {e}"); } - if let Err(e) = f { + if let Err(e) = &f { tracing::error!("federation handler: {e}"); } - (envelope.nack)(); + + // Last delivery attempt -> move to DLQ then ack. + // Earlier attempts -> nack so NATS retries. + if envelope.delivery_count >= CONSUMER_MAX_DELIVER as u64 { + let error_msg = n + .err() + .or(f.err()) + .map(|e| e.to_string()) + .unwrap_or_else(|| "unknown error".into()); + + // Serialize event back to payload for storage. + let ep = event_payload::EventPayload::from(event); + let event_type = ep.subject().to_string(); + let payload = serde_json::to_value(&ep).unwrap_or(serde_json::Value::Null); + + if let Err(e) = infra + .dlq_store + .insert(&event_type, &payload, &error_msg) + .await + { + tracing::error!("DLQ insert failed: {e} — message lost"); + } else { + tracing::warn!( + event_type, + delivery_count = envelope.delivery_count, + "event exhausted — moved to DLQ" + ); + } + (envelope.ack)(); // ack from NATS — DLQ owns it now + } else { + (envelope.nack)(); + } } } Err(e) => tracing::error!("consumer error: {e}"),