fix(nats): align with movies-diary — Limits retention, single wildcard subject, filter_subject on consumer, prefixed publish
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Has been cancelled
test / unit (pull_request) Has been cancelled
test / integration (pull_request) Has been cancelled

This commit is contained in:
2026-05-14 23:17:57 +02:00
parent 55c55424b5
commit 0caca58c1c

View File

@@ -5,29 +5,16 @@ use event_transport::{MessageSource, RawMessage, Transport};
use futures::stream::BoxStream; use futures::stream::BoxStream;
use std::sync::Arc; use std::sync::Arc;
// Stream name and subjects used by both publisher and consumer.
const STREAM_NAME: &str = "THOUGHTS_EVENTS"; const STREAM_NAME: &str = "THOUGHTS_EVENTS";
// Explicit prefixes instead of ">" — NATS WorkQueue retention disallows const STREAM_SUBJECT: &str = "thoughts-events.>";
// the catch-all ">" wildcard without also setting no_ack = true.
const STREAM_SUBJECTS: &[&str] = &[
"thoughts.>",
"likes.>",
"boosts.>",
"follows.>",
"users.>",
"federation.>",
];
const CONSUMER_NAME: &str = "worker"; const CONSUMER_NAME: &str = "worker";
// Redelivery timeout: if a message is not acked within this time, NATS redelivers it. const MAX_MESSAGES: i64 = 100_000;
const ACK_WAIT_SECS: u64 = 30;
// Maximum delivery attempts before the message goes to a dead-letter stream (if configured).
const MAX_DELIVER: i64 = 5;
fn stream_config() -> StreamConfig { fn stream_config() -> StreamConfig {
StreamConfig { StreamConfig {
name: STREAM_NAME.to_string(), name: STREAM_NAME.to_string(),
subjects: STREAM_SUBJECTS.iter().map(|s| s.to_string()).collect(), subjects: vec![STREAM_SUBJECT.to_string()],
retention: jetstream::stream::RetentionPolicy::WorkQueue, max_messages: MAX_MESSAGES,
..Default::default() ..Default::default()
} }
} }
@@ -40,7 +27,7 @@ pub async fn ensure_stream(client: &async_nats::Client) -> Result<(), DomainErro
// Falls back to create if the stream doesn't exist yet. // Falls back to create if the stream doesn't exist yet.
match js.update_stream(stream_config()).await { match js.update_stream(stream_config()).await {
Ok(_) => { Ok(_) => {
tracing::info!(subjects = ?STREAM_SUBJECTS, "JetStream stream updated"); tracing::info!(subject = STREAM_SUBJECT, "JetStream stream updated");
Ok(()) Ok(())
} }
Err(e) => { Err(e) => {
@@ -70,8 +57,10 @@ impl NatsTransport {
#[async_trait] #[async_trait]
impl Transport for NatsTransport { impl Transport for NatsTransport {
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> {
// Prefix all subjects so they land inside the stream's subject filter.
let full_subject = format!("thoughts-events.{subject}");
self.jetstream self.jetstream
.publish(subject.to_string(), bytes.to_vec().into()) .publish(full_subject, bytes.to_vec().into())
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))? .map_err(|e| DomainError::Internal(e.to_string()))?
.await // wait for server ack — confirms message is durably stored .await // wait for server ack — confirms message is durably stored
@@ -129,9 +118,7 @@ impl MessageSource for NatsMessageSource {
CONSUMER_NAME, CONSUMER_NAME,
jetstream::consumer::pull::Config { jetstream::consumer::pull::Config {
durable_name: Some(CONSUMER_NAME.to_string()), durable_name: Some(CONSUMER_NAME.to_string()),
ack_policy: jetstream::consumer::AckPolicy::Explicit, filter_subject: STREAM_SUBJECT.to_string(),
ack_wait: std::time::Duration::from_secs(ACK_WAIT_SECS),
max_deliver: MAX_DELIVER,
..Default::default() ..Default::default()
}, },
) )