diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index eb24139..a2a9170 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -113,6 +113,17 @@ impl MessageSource for NatsMessageSource { } }; + // Delete any existing push consumer with this name — can't reuse as pull. + // No-op if it doesn't exist or is already a pull consumer. + if let Ok(info) = stream.consumer_info(CONSUMER_NAME).await { + if info.config.deliver_subject.is_some() { + tracing::info!( + "deleting old push consumer '{CONSUMER_NAME}', replacing with pull" + ); + let _ = stream.delete_consumer(CONSUMER_NAME).await; + } + } + let consumer = match stream .get_or_create_consumer( CONSUMER_NAME,