feat(worker): DLQ processor — exhausted events moved to failed_events with exponential retry

This commit is contained in:
2026-05-15 16:26:44 +02:00
parent c092b9e658
commit e43d784c39
4 changed files with 138 additions and 20 deletions

View File

@@ -12,6 +12,7 @@ domain = { workspace = true }
application = { workspace = true } application = { workspace = true }
nats = { workspace = true } nats = { workspace = true }
event-transport = { workspace = true } event-transport = { workspace = true }
event-payload = { workspace = true }
activitypub-base = { workspace = true } activitypub-base = { workspace = true }
activitypub = { workspace = true } activitypub = { workspace = true }
postgres = { workspace = true } postgres = { workspace = true }
@@ -22,6 +23,7 @@ futures = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }
tracing-subscriber = { workspace = true } tracing-subscriber = { workspace = true }
dotenvy = { workspace = true } dotenvy = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true } sqlx = { workspace = true }
[dev-dependencies] [dev-dependencies]

64
crates/worker/src/dlq.rs Normal file
View File

@@ -0,0 +1,64 @@
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
use postgres::failed_event::{PgFailedEventStore, DLQ_MAX_RETRIES, DLQ_POLL_INTERVAL_SECS};
use std::sync::Arc;
/// Background task: polls `failed_events` and republishes due rows to the event bus.
pub async fn run_dlq_processor(store: Arc<PgFailedEventStore>, publisher: Arc<dyn EventPublisher>) {
let interval = std::time::Duration::from_secs(DLQ_POLL_INTERVAL_SECS);
loop {
tokio::time::sleep(interval).await;
if let Err(e) = process_due(&store, &*publisher).await {
tracing::error!("DLQ processor error: {e}");
}
}
}
async fn process_due(
store: &PgFailedEventStore,
publisher: &dyn EventPublisher,
) -> Result<(), sqlx::Error> {
let due = store.poll_due().await?;
if due.is_empty() {
return Ok(());
}
tracing::info!(count = due.len(), "DLQ: processing due events");
for row in due {
if row.retry_count >= DLQ_MAX_RETRIES {
tracing::error!(
id = %row.id,
event_type = %row.event_type,
retry_count = row.retry_count,
"DLQ: event permanently failed — parking",
);
store.park_permanently(row.id).await?;
continue;
}
let republish_result = republish(&row.payload, publisher).await;
match republish_result {
Ok(()) => {
tracing::info!(id = %row.id, "DLQ: republished successfully");
store.advance(row.id, None).await?;
}
Err(e) => {
tracing::warn!(id = %row.id, error = %e, "DLQ: republish failed");
store.advance(row.id, Some(&e.to_string())).await?;
}
}
}
Ok(())
}
async fn republish(
payload: &serde_json::Value,
publisher: &dyn EventPublisher,
) -> Result<(), DomainError> {
use event_payload::EventPayload;
let ep: EventPayload = serde_json::from_value(payload.clone())
.map_err(|e| DomainError::Internal(format!("DLQ deserialize: {e}")))?;
let event = DomainEvent::try_from(ep)
.map_err(|e| DomainError::Internal(format!("DLQ event conversion: {e}")))?;
publisher.publish(&event).await
}

View File

@@ -1,10 +1,11 @@
use postgres::failed_event::PgFailedEventStore;
use sqlx::PgPool; use sqlx::PgPool;
use std::sync::Arc; use std::sync::Arc;
use activitypub::ThoughtsObjectHandler; use activitypub::ThoughtsObjectHandler;
use activitypub_base::ActivityPubService; use activitypub_base::ActivityPubService;
use application::services::{FederationEventService, NotificationEventService}; use application::services::{FederationEventService, NotificationEventService};
use domain::ports::{ActivityPubRepository, OutboundFederationPort}; use domain::ports::{ActivityPubRepository, EventPublisher, OutboundFederationPort};
use postgres::activitypub::PgActivityPubRepository; use postgres::activitypub::PgActivityPubRepository;
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
@@ -15,14 +16,14 @@ pub struct WorkerHandlers {
pub federation: FederationHandler, pub federation: FederationHandler,
} }
pub async fn build( pub struct WorkerInfra {
database_url: &str, pub consumer: event_transport::EventConsumerAdapter<nats::NatsMessageSource>,
base_url: &str, pub handlers: WorkerHandlers,
nats_url: &str, pub dlq_store: Arc<PgFailedEventStore>,
) -> ( pub event_publisher: Arc<dyn EventPublisher>,
event_transport::EventConsumerAdapter<nats::NatsMessageSource>, }
WorkerHandlers,
) { pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> WorkerInfra {
let pool = PgPool::connect(database_url) let pool = PgPool::connect(database_url)
.await .await
.expect("DB connect failed"); .expect("DB connect failed");
@@ -83,15 +84,27 @@ pub async fn build(
}, },
}; };
// NATS consumer // DLQ store
let dlq_store = Arc::new(PgFailedEventStore::new(pool));
// NATS consumer + publisher
let nats_client = async_nats::connect(nats_url) let nats_client = async_nats::connect(nats_url)
.await .await
.expect("NATS connect failed"); .expect("NATS connect failed");
nats::ensure_stream(&nats_client) nats::ensure_stream(&nats_client)
.await .await
.expect("JetStream stream setup failed"); .expect("JetStream stream setup failed");
let consumer = let consumer = event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(
event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(nats_client)); nats_client.clone(),
));
let event_publisher: Arc<dyn EventPublisher> = Arc::new(
event_transport::EventPublisherAdapter::new(nats::NatsTransport::new(nats_client)),
);
(consumer, handlers) WorkerInfra {
consumer,
handlers,
dlq_store,
event_publisher,
}
} }

View File

@@ -1,8 +1,10 @@
mod dlq;
mod factory; mod factory;
mod handlers; mod handlers;
use domain::ports::EventConsumer; use domain::ports::EventConsumer;
use futures::StreamExt; use futures::StreamExt;
use nats::CONSUMER_MAX_DELIVER;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
@@ -16,31 +18,68 @@ async fn main() {
let base_url = std::env::var("BASE_URL").expect("BASE_URL required"); let base_url = std::env::var("BASE_URL").expect("BASE_URL required");
tracing::info!("Building worker..."); tracing::info!("Building worker...");
let (consumer, handlers) = factory::build(&database_url, &base_url, &nats_url).await; let infra = factory::build(&database_url, &base_url, &nats_url).await;
// Spawn DLQ processor as a background task.
tokio::spawn(dlq::run_dlq_processor(
infra.dlq_store.clone(),
infra.event_publisher.clone(),
));
tracing::info!("Worker started, consuming events..."); tracing::info!("Worker started, consuming events...");
let mut stream = consumer.consume(); let mut stream = infra.consumer.consume();
while let Some(result) = stream.next().await { while let Some(result) = stream.next().await {
match result { match result {
Ok(envelope) => { Ok(envelope) => {
let event = &envelope.event; let event = &envelope.event;
tracing::debug!(?event, "received event"); tracing::debug!(?event, "received event");
let n = handlers.notification.handle(event).await; let n = infra.handlers.notification.handle(event).await;
let f = handlers.federation.handle(event).await; let f = infra.handlers.federation.handle(event).await;
if n.is_ok() && f.is_ok() { if n.is_ok() && f.is_ok() {
(envelope.ack)(); (envelope.ack)();
} else { } else {
if let Err(e) = n { if let Err(e) = &n {
tracing::error!("notification handler: {e}"); tracing::error!("notification handler: {e}");
} }
if let Err(e) = f { if let Err(e) = &f {
tracing::error!("federation handler: {e}"); tracing::error!("federation handler: {e}");
} }
// Last delivery attempt -> move to DLQ then ack.
// Earlier attempts -> nack so NATS retries.
if envelope.delivery_count >= CONSUMER_MAX_DELIVER as u64 {
let error_msg = n
.err()
.or(f.err())
.map(|e| e.to_string())
.unwrap_or_else(|| "unknown error".into());
// Serialize event back to payload for storage.
let ep = event_payload::EventPayload::from(event);
let event_type = ep.subject().to_string();
let payload = serde_json::to_value(&ep).unwrap_or(serde_json::Value::Null);
if let Err(e) = infra
.dlq_store
.insert(&event_type, &payload, &error_msg)
.await
{
tracing::error!("DLQ insert failed: {e} — message lost");
} else {
tracing::warn!(
event_type,
delivery_count = envelope.delivery_count,
"event exhausted — moved to DLQ"
);
}
(envelope.ack)(); // ack from NATS — DLQ owns it now
} else {
(envelope.nack)(); (envelope.nack)();
} }
} }
}
Err(e) => tracing::error!("consumer error: {e}"), Err(e) => tracing::error!("consumer error: {e}"),
} }
} }