From 0caca58c1ca6a43ba05dcc7db3af10b98b5a1532 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 23:17:57 +0200 Subject: [PATCH] =?UTF-8?q?fix(nats):=20align=20with=20movies-diary=20?= =?UTF-8?q?=E2=80=94=20Limits=20retention,=20single=20wildcard=20subject,?= =?UTF-8?q?=20filter=5Fsubject=20on=20consumer,=20prefixed=20publish?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/adapters/nats/src/lib.rs | 31 +++++++++---------------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index a2a9170..a860c13 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -5,29 +5,16 @@ use event_transport::{MessageSource, RawMessage, Transport}; use futures::stream::BoxStream; use std::sync::Arc; -// Stream name and subjects used by both publisher and consumer. const STREAM_NAME: &str = "THOUGHTS_EVENTS"; -// Explicit prefixes instead of ">" — NATS WorkQueue retention disallows -// the catch-all ">" wildcard without also setting no_ack = true. -const STREAM_SUBJECTS: &[&str] = &[ - "thoughts.>", - "likes.>", - "boosts.>", - "follows.>", - "users.>", - "federation.>", -]; +const STREAM_SUBJECT: &str = "thoughts-events.>"; const CONSUMER_NAME: &str = "worker"; -// Redelivery timeout: if a message is not acked within this time, NATS redelivers it. -const ACK_WAIT_SECS: u64 = 30; -// Maximum delivery attempts before the message goes to a dead-letter stream (if configured). -const MAX_DELIVER: i64 = 5; +const MAX_MESSAGES: i64 = 100_000; fn stream_config() -> StreamConfig { StreamConfig { name: STREAM_NAME.to_string(), - subjects: STREAM_SUBJECTS.iter().map(|s| s.to_string()).collect(), - retention: jetstream::stream::RetentionPolicy::WorkQueue, + subjects: vec![STREAM_SUBJECT.to_string()], + max_messages: MAX_MESSAGES, ..Default::default() } } @@ -40,7 +27,7 @@ pub async fn ensure_stream(client: &async_nats::Client) -> Result<(), DomainErro // Falls back to create if the stream doesn't exist yet. match js.update_stream(stream_config()).await { Ok(_) => { - tracing::info!(subjects = ?STREAM_SUBJECTS, "JetStream stream updated"); + tracing::info!(subject = STREAM_SUBJECT, "JetStream stream updated"); Ok(()) } Err(e) => { @@ -70,8 +57,10 @@ impl NatsTransport { #[async_trait] impl Transport for NatsTransport { async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { + // Prefix all subjects so they land inside the stream's subject filter. + let full_subject = format!("thoughts-events.{subject}"); self.jetstream - .publish(subject.to_string(), bytes.to_vec().into()) + .publish(full_subject, bytes.to_vec().into()) .await .map_err(|e| DomainError::Internal(e.to_string()))? .await // wait for server ack — confirms message is durably stored @@ -129,9 +118,7 @@ impl MessageSource for NatsMessageSource { CONSUMER_NAME, jetstream::consumer::pull::Config { durable_name: Some(CONSUMER_NAME.to_string()), - ack_policy: jetstream::consumer::AckPolicy::Explicit, - ack_wait: std::time::Duration::from_secs(ACK_WAIT_SECS), - max_deliver: MAX_DELIVER, + filter_subject: STREAM_SUBJECT.to_string(), ..Default::default() }, )