From 40ed9b1ad87d162902444d4296c8a64eb634605a Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 23:19:41 +0200 Subject: [PATCH] fix(nats): delete+recreate stream when retention policy is incompatible --- crates/adapters/nats/src/lib.rs | 35 ++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index a860c13..268d9ba 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -19,25 +19,28 @@ fn stream_config() -> StreamConfig { } } -/// Ensure the JetStream stream exists and has the current subject list. -/// Idempotent — creates if absent, updates subjects if already present. +/// Ensure the JetStream stream exists with the current config. +/// If an incompatible stream exists (e.g. wrong retention policy), deletes and recreates it. pub async fn ensure_stream(client: &async_nats::Client) -> Result<(), DomainError> { let js = jetstream::new(client.clone()); - // Try to update first (covers the case where stream exists with stale subjects). - // Falls back to create if the stream doesn't exist yet. - match js.update_stream(stream_config()).await { - Ok(_) => { - tracing::info!(subject = STREAM_SUBJECT, "JetStream stream updated"); - Ok(()) - } - Err(e) => { - tracing::warn!("JetStream stream update failed ({e}), falling back to get_or_create"); - js.get_or_create_stream(stream_config()) - .await - .map(|_| ()) - .map_err(|e| DomainError::Internal(format!("JetStream stream setup failed: {e}"))) - } + + // Happy path: stream exists and config is compatible. + if js.update_stream(stream_config()).await.is_ok() { + tracing::info!(subject = STREAM_SUBJECT, "JetStream stream updated"); + return Ok(()); } + + // Update failed — retention policy mismatch or other incompatibility. + // Delete the old stream and recreate with current config. + tracing::warn!( + "JetStream stream update failed (incompatible config), deleting '{STREAM_NAME}' and recreating" + ); + let _ = js.delete_stream(STREAM_NAME).await; + + js.create_stream(stream_config()) + .await + .map(|_| ()) + .map_err(|e| DomainError::Internal(format!("JetStream stream create failed: {e}"))) } // ── NatsTransport — JetStream publish ──────────────────────────────────────