- add admin POST /api/v1/admin/reindex-search endpoint + event-driven handler - backfill persons from movie_cast/movie_crew into persons table - paginate person list_page/backfill_from_credits_batch to cap memory - concurrent worker event dispatch with semaphore (max 8) - graceful worker shutdown (drain in-flight tasks on SIGINT) - always ack events, log handler errors as warnings (no infinite retry) - NATS ack_wait 600s, AtomicBool guard against concurrent reindex - add username/display_name to UserSummaryDto and users list - add person_id to CastMemberDto/CrewMemberDto via get_movie_profile use case - add movie_id to wrapup MovieRef, person_id to wrapup PersonStat - thread tmdb_person_id through wrapup cast pipeline - add is_federated to FeedEntryDto - cap orphaned persons query with LIMIT 500 - add SPA link to classic site footer
215 lines
7.2 KiB
Rust
215 lines
7.2 KiB
Rust
use async_nats::{
|
|
Client,
|
|
jetstream::{self, consumer::pull, message::AckKind, stream::Config as StreamConfig},
|
|
};
|
|
use async_trait::async_trait;
|
|
use domain::{
|
|
errors::DomainError,
|
|
events::{AckHandle, DomainEvent, EventEnvelope},
|
|
ports::EventConsumer,
|
|
};
|
|
use futures::{
|
|
StreamExt,
|
|
stream::{self, BoxStream},
|
|
};
|
|
use std::sync::Arc;
|
|
use tokio::sync::{Mutex, mpsc};
|
|
|
|
use crate::{config::NatsConfig, payload::NatsEventPayload, subject::consumer_subject_filter};
|
|
|
|
// ── JetStream ack handle ─────────────────────────────────────────────────────
|
|
|
|
struct NatsJetStreamAckHandle {
|
|
message: async_nats::jetstream::Message,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl AckHandle for NatsJetStreamAckHandle {
|
|
async fn ack(&self) -> Result<(), DomainError> {
|
|
tracing::debug!(
|
|
"acknowledging message with sequence {}",
|
|
self.message.info().unwrap().stream_sequence
|
|
);
|
|
|
|
self.message
|
|
.ack()
|
|
.await
|
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
|
}
|
|
|
|
async fn nack(&self) -> Result<(), DomainError> {
|
|
tracing::debug!(
|
|
"negatively acknowledging message with sequence {}",
|
|
self.message.info().unwrap().stream_sequence
|
|
);
|
|
|
|
self.message
|
|
.ack_with(AckKind::Nak(None))
|
|
.await
|
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
|
}
|
|
}
|
|
|
|
// ── Core NATS ack handle (no-op) ─────────────────────────────────────────────
|
|
|
|
struct NoopAck;
|
|
|
|
#[async_trait]
|
|
impl AckHandle for NoopAck {
|
|
async fn ack(&self) -> Result<(), DomainError> {
|
|
Ok(())
|
|
}
|
|
async fn nack(&self) -> Result<(), DomainError> {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
// ── Envelope construction helpers ────────────────────────────────────────────
|
|
|
|
fn decode_js(msg: async_nats::jetstream::Message) -> Result<EventEnvelope, DomainError> {
|
|
let payload: NatsEventPayload = serde_json::from_slice(&msg.payload)
|
|
.map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?;
|
|
let event = DomainEvent::try_from(payload)?;
|
|
Ok(EventEnvelope::new(
|
|
event,
|
|
Box::new(NatsJetStreamAckHandle { message: msg }),
|
|
))
|
|
}
|
|
|
|
fn decode_core(msg: async_nats::Message) -> Result<EventEnvelope, DomainError> {
|
|
let payload: NatsEventPayload = serde_json::from_slice(&msg.payload)
|
|
.map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?;
|
|
let event = DomainEvent::try_from(payload)?;
|
|
Ok(EventEnvelope::new(event, Box::new(NoopAck)))
|
|
}
|
|
|
|
// ── Channel-bridge shared by both consumers ──────────────────────────────────
|
|
|
|
type EnvelopeRx = Arc<Mutex<mpsc::Receiver<Result<EventEnvelope, DomainError>>>>;
|
|
|
|
fn consume_from_rx(rx: EnvelopeRx) -> BoxStream<'static, Result<EventEnvelope, DomainError>> {
|
|
Box::pin(stream::unfold(rx, |rx| async move {
|
|
let item = rx.lock().await.recv().await?;
|
|
Some((item, rx))
|
|
}))
|
|
}
|
|
|
|
// ── JetStream consumer ────────────────────────────────────────────────────────
|
|
|
|
pub struct NatsJetStreamConsumer {
|
|
rx: EnvelopeRx,
|
|
}
|
|
|
|
impl NatsJetStreamConsumer {
|
|
pub async fn create(cfg: &NatsConfig, client: Client) -> anyhow::Result<Self> {
|
|
let js = jetstream::new(client);
|
|
|
|
let stream = js
|
|
.get_or_create_stream(StreamConfig {
|
|
name: cfg.stream_name.clone(),
|
|
subjects: vec![consumer_subject_filter(&cfg.subject_prefix)],
|
|
max_messages: 100_000,
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
|
|
let subject_filter = consumer_subject_filter(&cfg.subject_prefix);
|
|
let consumer = stream
|
|
.get_or_create_consumer(
|
|
cfg.consumer_name.as_str(),
|
|
pull::Config {
|
|
durable_name: Some(cfg.consumer_name.clone()),
|
|
filter_subject: subject_filter,
|
|
ack_wait: std::time::Duration::from_secs(600),
|
|
..Default::default()
|
|
},
|
|
)
|
|
.await?;
|
|
|
|
let (tx, rx) = mpsc::channel(128);
|
|
|
|
tokio::spawn(async move {
|
|
loop {
|
|
let mut messages = match consumer.messages().await {
|
|
Err(e) => {
|
|
tracing::error!("failed to fetch messages: {}", e);
|
|
|
|
let _ = tx
|
|
.send(Err(DomainError::InfrastructureError(e.to_string())))
|
|
.await;
|
|
return;
|
|
}
|
|
Ok(m) => m,
|
|
};
|
|
while let Some(result) = messages.next().await {
|
|
let envelope = result
|
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
|
.and_then(decode_js);
|
|
|
|
if tx.send(envelope).await.is_err() {
|
|
tracing::info!("consumer channel closed, stopping message processing");
|
|
return;
|
|
}
|
|
|
|
tracing::debug!("message sent to consumer channel");
|
|
}
|
|
// messages() stream ended (fetch expired in strict mode) — restart
|
|
}
|
|
});
|
|
|
|
Ok(Self {
|
|
rx: Arc::new(Mutex::new(rx)),
|
|
})
|
|
}
|
|
}
|
|
|
|
impl EventConsumer for NatsJetStreamConsumer {
|
|
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
|
consume_from_rx(Arc::clone(&self.rx))
|
|
}
|
|
}
|
|
|
|
// ── Core NATS consumer ────────────────────────────────────────────────────────
|
|
|
|
pub struct NatsCoreConsumer {
|
|
rx: EnvelopeRx,
|
|
}
|
|
|
|
impl NatsCoreConsumer {
|
|
pub async fn create(cfg: &NatsConfig, client: Client) -> anyhow::Result<Self> {
|
|
let subject = consumer_subject_filter(&cfg.subject_prefix);
|
|
let mut subscriber = client.subscribe(subject).await?;
|
|
|
|
let (tx, rx) = mpsc::channel(128);
|
|
|
|
tokio::spawn(async move {
|
|
while let Some(msg) = subscriber.next().await {
|
|
let envelope = decode_core(msg);
|
|
|
|
tracing::debug!("message received and decoded, sending to consumer channel");
|
|
|
|
if tx.send(envelope).await.is_err() {
|
|
tracing::info!("consumer channel closed, stopping message processing");
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
Ok(Self {
|
|
rx: Arc::new(Mutex::new(rx)),
|
|
})
|
|
}
|
|
}
|
|
|
|
impl EventConsumer for NatsCoreConsumer {
|
|
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
|
consume_from_rx(Arc::clone(&self.rx))
|
|
}
|
|
}
|
|
|
|
fn _assert_send_sync() {
|
|
fn check<T: Send + Sync>() {}
|
|
check::<NatsJetStreamConsumer>();
|
|
check::<NatsCoreConsumer>();
|
|
}
|