fix(nats): delete+recreate stream when retention policy is incompatible
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Has been cancelled
test / unit (pull_request) Has been cancelled
test / integration (pull_request) Has been cancelled
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Has been cancelled
test / unit (pull_request) Has been cancelled
test / integration (pull_request) Has been cancelled
This commit is contained in:
@@ -19,25 +19,28 @@ fn stream_config() -> StreamConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensure the JetStream stream exists and has the current subject list.
|
||||
/// Idempotent — creates if absent, updates subjects if already present.
|
||||
/// Ensure the JetStream stream exists with the current config.
|
||||
/// If an incompatible stream exists (e.g. wrong retention policy), deletes and recreates it.
|
||||
pub async fn ensure_stream(client: &async_nats::Client) -> Result<(), DomainError> {
|
||||
let js = jetstream::new(client.clone());
|
||||
// Try to update first (covers the case where stream exists with stale subjects).
|
||||
// Falls back to create if the stream doesn't exist yet.
|
||||
match js.update_stream(stream_config()).await {
|
||||
Ok(_) => {
|
||||
tracing::info!(subject = STREAM_SUBJECT, "JetStream stream updated");
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("JetStream stream update failed ({e}), falling back to get_or_create");
|
||||
js.get_or_create_stream(stream_config())
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(|e| DomainError::Internal(format!("JetStream stream setup failed: {e}")))
|
||||
}
|
||||
|
||||
// Happy path: stream exists and config is compatible.
|
||||
if js.update_stream(stream_config()).await.is_ok() {
|
||||
tracing::info!(subject = STREAM_SUBJECT, "JetStream stream updated");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Update failed — retention policy mismatch or other incompatibility.
|
||||
// Delete the old stream and recreate with current config.
|
||||
tracing::warn!(
|
||||
"JetStream stream update failed (incompatible config), deleting '{STREAM_NAME}' and recreating"
|
||||
);
|
||||
let _ = js.delete_stream(STREAM_NAME).await;
|
||||
|
||||
js.create_stream(stream_config())
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(|e| DomainError::Internal(format!("JetStream stream create failed: {e}")))
|
||||
}
|
||||
|
||||
// ── NatsTransport — JetStream publish ──────────────────────────────────────
|
||||
|
||||
Reference in New Issue
Block a user