fix: add federation.> to NATS stream subjects; update stream on startup; truncate long profile URLs
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 9m48s
test / unit (pull_request) Failing after 10m52s
test / integration (pull_request) Failing after 16m50s
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 9m48s
test / unit (pull_request) Failing after 10m52s
test / integration (pull_request) Failing after 16m50s
This commit is contained in:
@@ -9,7 +9,14 @@ use std::sync::Arc;
|
|||||||
const STREAM_NAME: &str = "THOUGHTS_EVENTS";
|
const STREAM_NAME: &str = "THOUGHTS_EVENTS";
|
||||||
// Explicit prefixes instead of ">" — NATS WorkQueue retention disallows
|
// Explicit prefixes instead of ">" — NATS WorkQueue retention disallows
|
||||||
// the catch-all ">" wildcard without also setting no_ack = true.
|
// the catch-all ">" wildcard without also setting no_ack = true.
|
||||||
const STREAM_SUBJECTS: &[&str] = &["thoughts.>", "likes.>", "boosts.>", "follows.>", "users.>"];
|
const STREAM_SUBJECTS: &[&str] = &[
|
||||||
|
"thoughts.>",
|
||||||
|
"likes.>",
|
||||||
|
"boosts.>",
|
||||||
|
"follows.>",
|
||||||
|
"users.>",
|
||||||
|
"federation.>",
|
||||||
|
];
|
||||||
const CONSUMER_NAME: &str = "worker";
|
const CONSUMER_NAME: &str = "worker";
|
||||||
// Redelivery timeout: if a message is not acked within this time, NATS redelivers it.
|
// Redelivery timeout: if a message is not acked within this time, NATS redelivers it.
|
||||||
const ACK_WAIT_SECS: u64 = 30;
|
const ACK_WAIT_SECS: u64 = 30;
|
||||||
@@ -25,14 +32,20 @@ fn stream_config() -> StreamConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ensure the JetStream stream exists. Call once at startup before publishing or consuming.
|
/// Ensure the JetStream stream exists and has the current subject list.
|
||||||
/// Idempotent — safe to call from both bootstrap and worker factories.
|
/// Idempotent — creates if absent, updates subjects if already present.
|
||||||
pub async fn ensure_stream(client: &async_nats::Client) -> Result<(), DomainError> {
|
pub async fn ensure_stream(client: &async_nats::Client) -> Result<(), DomainError> {
|
||||||
let js = jetstream::new(client.clone());
|
let js = jetstream::new(client.clone());
|
||||||
js.get_or_create_stream(stream_config())
|
// Try to update first (covers the case where stream exists with stale subjects).
|
||||||
.await
|
// Falls back to create if the stream doesn't exist yet.
|
||||||
.map_err(|e| DomainError::Internal(format!("JetStream stream setup failed: {e}")))?;
|
match js.update_stream(stream_config()).await {
|
||||||
Ok(())
|
Ok(_) => Ok(()),
|
||||||
|
Err(_) => js
|
||||||
|
.get_or_create_stream(stream_config())
|
||||||
|
.await
|
||||||
|
.map(|_| ())
|
||||||
|
.map_err(|e| DomainError::Internal(format!("JetStream stream setup failed: {e}"))),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── NatsTransport — JetStream publish ──────────────────────────────────────
|
// ── NatsTransport — JetStream publish ──────────────────────────────────────
|
||||||
|
|||||||
@@ -117,9 +117,9 @@ export function RemoteUserProfile({
|
|||||||
size="sm"
|
size="sm"
|
||||||
className="mt-4 w-full"
|
className="mt-4 w-full"
|
||||||
>
|
>
|
||||||
<Link href={actor.url} target="_blank" rel="noopener noreferrer">
|
<Link href={actor.url} target="_blank" rel="noopener noreferrer" className="flex items-center overflow-hidden">
|
||||||
<ExternalLink className="mr-2 h-4 w-4" />
|
<ExternalLink className="mr-2 h-4 w-4 shrink-0" />
|
||||||
View on {new URL(actor.url).hostname}
|
<span className="truncate">{new URL(actor.url).hostname}</span>
|
||||||
</Link>
|
</Link>
|
||||||
</Button>
|
</Button>
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user