From df7fcf509646070e66d67734ed77962afa076239 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 22:43:43 +0200 Subject: [PATCH] fix: add federation.> to NATS stream subjects; update stream on startup; truncate long profile URLs --- crates/adapters/nats/src/lib.rs | 27 ++++++++++++++----- .../components/remote-user-profile.tsx | 6 ++--- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index ae3fb26..afd1ae7 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -9,7 +9,14 @@ use std::sync::Arc; const STREAM_NAME: &str = "THOUGHTS_EVENTS"; // Explicit prefixes instead of ">" — NATS WorkQueue retention disallows // the catch-all ">" wildcard without also setting no_ack = true. -const STREAM_SUBJECTS: &[&str] = &["thoughts.>", "likes.>", "boosts.>", "follows.>", "users.>"]; +const STREAM_SUBJECTS: &[&str] = &[ + "thoughts.>", + "likes.>", + "boosts.>", + "follows.>", + "users.>", + "federation.>", +]; const CONSUMER_NAME: &str = "worker"; // Redelivery timeout: if a message is not acked within this time, NATS redelivers it. const ACK_WAIT_SECS: u64 = 30; @@ -25,14 +32,20 @@ fn stream_config() -> StreamConfig { } } -/// Ensure the JetStream stream exists. Call once at startup before publishing or consuming. -/// Idempotent — safe to call from both bootstrap and worker factories. +/// Ensure the JetStream stream exists and has the current subject list. +/// Idempotent — creates if absent, updates subjects if already present. pub async fn ensure_stream(client: &async_nats::Client) -> Result<(), DomainError> { let js = jetstream::new(client.clone()); - js.get_or_create_stream(stream_config()) - .await - .map_err(|e| DomainError::Internal(format!("JetStream stream setup failed: {e}")))?; - Ok(()) + // Try to update first (covers the case where stream exists with stale subjects). + // Falls back to create if the stream doesn't exist yet. + match js.update_stream(stream_config()).await { + Ok(_) => Ok(()), + Err(_) => js + .get_or_create_stream(stream_config()) + .await + .map(|_| ()) + .map_err(|e| DomainError::Internal(format!("JetStream stream setup failed: {e}"))), + } } // ── NatsTransport — JetStream publish ────────────────────────────────────── diff --git a/thoughts-frontend/components/remote-user-profile.tsx b/thoughts-frontend/components/remote-user-profile.tsx index 82f1389..b68a2c5 100644 --- a/thoughts-frontend/components/remote-user-profile.tsx +++ b/thoughts-frontend/components/remote-user-profile.tsx @@ -117,9 +117,9 @@ export function RemoteUserProfile({ size="sm" className="mt-4 w-full" > - - - View on {new URL(actor.url).hostname} + + + {new URL(actor.url).hostname}