feat: search reindex, worker improvements, person IDs, user display names

- add admin POST /api/v1/admin/reindex-search endpoint + event-driven handler
- backfill persons from movie_cast/movie_crew into persons table
- paginate person list_page/backfill_from_credits_batch to cap memory
- concurrent worker event dispatch with semaphore (max 8)
- graceful worker shutdown (drain in-flight tasks on SIGINT)
- always ack events, log handler errors as warnings (no infinite retry)
- NATS ack_wait 600s, AtomicBool guard against concurrent reindex
- add username/display_name to UserSummaryDto and users list
- add person_id to CastMemberDto/CrewMemberDto via get_movie_profile use case
- add movie_id to wrapup MovieRef, person_id to wrapup PersonStat
- thread tmdb_person_id through wrapup cast pipeline
- add is_federated to FeedEntryDto
- cap orphaned persons query with LIMIT 500
- add SPA link to classic site footer
This commit is contained in:
2026-06-04 14:43:28 +02:00
parent af8e58aeb8
commit bd7dc648c4
36 changed files with 693 additions and 118 deletions

View File

@@ -20,3 +20,4 @@ pub mod test_helpers;
pub use movies::MovieDiscoveryIndexer;
pub use movies::SearchCleanupHandler;
pub use movies::SearchReindexHandler;

View File

@@ -0,0 +1,78 @@
use domain::{
errors::DomainError,
models::{CastMember, CrewMember, ExternalPersonId, MovieProfile, PersonId},
value_objects::MovieId,
};
use uuid::Uuid;
use crate::context::AppContext;
pub struct GetMovieProfileQuery {
pub movie_id: Uuid,
}
pub struct CastMemberWithId {
pub person_id: PersonId,
pub tmdb_person_id: u64,
pub name: String,
pub character: String,
pub billing_order: u32,
pub profile_path: Option<String>,
}
pub struct CrewMemberWithId {
pub person_id: PersonId,
pub tmdb_person_id: u64,
pub name: String,
pub job: String,
pub department: String,
pub profile_path: Option<String>,
}
pub struct MovieProfileResult {
pub profile: MovieProfile,
pub cast: Vec<CastMemberWithId>,
pub crew: Vec<CrewMemberWithId>,
}
fn resolve_cast(member: &CastMember) -> CastMemberWithId {
let ext = ExternalPersonId::new(format!("tmdb:{}", member.tmdb_person_id));
CastMemberWithId {
person_id: PersonId::from_external(&ext),
tmdb_person_id: member.tmdb_person_id,
name: member.name.clone(),
character: member.character.clone(),
billing_order: member.billing_order,
profile_path: member.profile_path.clone(),
}
}
fn resolve_crew(member: &CrewMember) -> CrewMemberWithId {
let ext = ExternalPersonId::new(format!("tmdb:{}", member.tmdb_person_id));
CrewMemberWithId {
person_id: PersonId::from_external(&ext),
tmdb_person_id: member.tmdb_person_id,
name: member.name.clone(),
job: member.job.clone(),
department: member.department.clone(),
profile_path: member.profile_path.clone(),
}
}
pub async fn execute(
ctx: &AppContext,
query: GetMovieProfileQuery,
) -> Result<Option<MovieProfileResult>, DomainError> {
let movie_id = MovieId::from_uuid(query.movie_id);
let profile = ctx.repos.movie_profile.get_by_movie_id(&movie_id).await?;
Ok(profile.map(|p| {
let cast = p.cast.iter().map(resolve_cast).collect();
let crew = p.crew.iter().map(resolve_crew).collect();
MovieProfileResult {
profile: p,
cast,
crew,
}
}))
}

View File

@@ -1,10 +1,13 @@
pub mod commands;
pub mod discovery_indexer;
pub mod enrich_movie;
pub mod get_movie_profile;
pub mod get_movies;
pub mod queries;
pub mod reindex_search;
pub mod search_cleanup;
pub mod sync_poster;
pub use discovery_indexer::MovieDiscoveryIndexer;
pub use reindex_search::SearchReindexHandler;
pub use search_cleanup::SearchCleanupHandler;

View File

@@ -0,0 +1,165 @@
use async_trait::async_trait;
use domain::{
errors::DomainError,
events::DomainEvent,
models::{IndexableDocument, MovieFilter, collections::PageParams},
ports::EventHandler,
};
use std::sync::atomic::{AtomicBool, Ordering};
use crate::context::AppContext;
const BATCH_SIZE: u32 = 500;
pub struct SearchReindexHandler {
ctx: AppContext,
running: AtomicBool,
}
impl SearchReindexHandler {
pub fn new(ctx: AppContext) -> Self {
Self {
ctx,
running: AtomicBool::new(false),
}
}
}
#[async_trait]
impl EventHandler for SearchReindexHandler {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
if !matches!(event, DomainEvent::SearchReindexRequested) {
return Ok(());
}
if self.running.swap(true, Ordering::SeqCst) {
tracing::info!("search reindex already running, skipping");
return Ok(());
}
let result = self.run_reindex().await;
self.running.store(false, Ordering::SeqCst);
result
}
}
impl SearchReindexHandler {
async fn run_reindex(&self) -> Result<(), DomainError> {
tracing::info!("search reindex started");
let movies_indexed = self.reindex_movies().await?;
let backfilled = self.backfill_persons().await?;
if backfilled > 0 {
tracing::info!(backfilled, "backfilled missing persons from credits");
}
let persons_indexed = self.reindex_persons().await?;
tracing::info!(movies_indexed, persons_indexed, "search reindex completed");
Ok(())
}
async fn reindex_movies(&self) -> Result<u64, DomainError> {
let mut count: u64 = 0;
let mut offset: u32 = 0;
loop {
let page = self
.ctx
.repos
.movie
.list_movies(
&PageParams {
limit: BATCH_SIZE,
offset,
},
&MovieFilter::default(),
)
.await?;
for summary in &page.items {
let movie_id = summary.movie.id().clone();
let profile = self
.ctx
.repos
.movie_profile
.get_by_movie_id(&movie_id)
.await?;
if let Err(e) = self
.ctx
.repos
.search_command
.index(IndexableDocument::Movie {
id: movie_id.clone(),
movie: Box::new(summary.movie.clone()),
profile: profile.map(Box::new),
})
.await
{
tracing::warn!(movie_id = %movie_id.value(), "reindex movie failed: {e}");
}
count += 1;
}
if (page.items.len() as u32) < BATCH_SIZE {
break;
}
offset += BATCH_SIZE;
tokio::task::yield_now().await;
}
Ok(count)
}
async fn backfill_persons(&self) -> Result<u64, DomainError> {
let mut total = 0u64;
loop {
let (count, has_more) = self
.ctx
.repos
.person_command
.backfill_from_credits_batch(BATCH_SIZE)
.await?;
total += count;
if !has_more {
break;
}
tokio::task::yield_now().await;
}
Ok(total)
}
async fn reindex_persons(&self) -> Result<u64, DomainError> {
let mut count: u64 = 0;
let mut offset: u32 = 0;
loop {
let persons = self
.ctx
.repos
.person_query
.list_page(BATCH_SIZE, offset)
.await?;
for person in &persons {
if let Err(e) = self
.ctx
.repos
.search_command
.index(IndexableDocument::Person {
id: person.id().clone(),
person: Box::new(person.clone()),
})
.await
{
tracing::warn!(person = %person.name(), "reindex person failed: {e}");
}
count += 1;
}
if (persons.len() as u32) < BATCH_SIZE {
break;
}
offset += BATCH_SIZE;
tokio::task::yield_now().await;
}
Ok(count)
}
}

View File

@@ -61,6 +61,7 @@ impl EventHandler for RecordingHandler {
DomainEvent::WatchEventIngested { .. } => "watch_event_ingested",
DomainEvent::WrapUpRequested { .. } => "wrapup_requested",
DomainEvent::WrapUpCompleted { .. } => "wrapup_completed",
DomainEvent::SearchReindexRequested => "search_reindex",
};
self.calls.lock().unwrap().push(label);
Ok(())
@@ -85,34 +86,34 @@ async fn dispatches_to_all_handlers() {
};
WorkerService::new(Arc::new(consumer), vec![Arc::new(handler)])
.run()
.run(tokio::sync::watch::channel(false).1)
.await;
assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]);
}
#[tokio::test]
async fn nacks_when_handler_fails() {
let nack_called = Arc::new(Mutex::new(false));
async fn acks_even_when_handler_fails() {
let ack_called = Arc::new(Mutex::new(false));
struct TrackingAck {
nack_called: Arc<Mutex<bool>>,
ack_called: Arc<Mutex<bool>>,
}
#[async_trait]
impl AckHandle for TrackingAck {
async fn ack(&self) -> Result<(), DomainError> {
*self.ack_called.lock().unwrap() = true;
Ok(())
}
async fn nack(&self) -> Result<(), DomainError> {
*self.nack_called.lock().unwrap() = true;
Ok(())
}
}
struct TrackingConsumer {
event: DomainEvent,
nack_called: Arc<Mutex<bool>>,
ack_called: Arc<Mutex<bool>>,
}
impl EventConsumer for TrackingConsumer {
@@ -120,7 +121,7 @@ async fn nacks_when_handler_fails() {
let envelope = EventEnvelope::new(
self.event.clone(),
Box::new(TrackingAck {
nack_called: Arc::clone(&self.nack_called),
ack_called: Arc::clone(&self.ack_called),
}),
);
Box::pin(stream::iter(vec![Ok(envelope)]))
@@ -138,14 +139,14 @@ async fn nacks_when_handler_fails() {
let consumer = TrackingConsumer {
event: movie_discovered(),
nack_called: Arc::clone(&nack_called),
ack_called: Arc::clone(&ack_called),
};
WorkerService::new(Arc::new(consumer), vec![Arc::new(FailingHandler)])
.run()
.run(tokio::sync::watch::channel(false).1)
.await;
assert!(*nack_called.lock().unwrap());
assert!(*ack_called.lock().unwrap());
}
#[tokio::test]
@@ -189,7 +190,9 @@ async fn acks_when_all_handlers_succeed() {
ack_called: Arc::clone(&ack_called),
};
WorkerService::new(Arc::new(consumer), vec![]).run().await;
WorkerService::new(Arc::new(consumer), vec![])
.run(tokio::sync::watch::channel(false).1)
.await;
assert!(*ack_called.lock().unwrap());
}

View File

@@ -5,47 +5,73 @@ use domain::{
ports::{EventConsumer, EventHandler},
};
use futures::StreamExt;
use tokio::sync::Semaphore;
const DEFAULT_CONCURRENCY: usize = 8;
pub struct WorkerService {
consumer: Arc<dyn EventConsumer>,
handlers: Vec<Arc<dyn EventHandler>>,
semaphore: Arc<Semaphore>,
}
impl WorkerService {
pub fn new(consumer: Arc<dyn EventConsumer>, handlers: Vec<Arc<dyn EventHandler>>) -> Self {
Self { consumer, handlers }
Self {
consumer,
handlers,
semaphore: Arc::new(Semaphore::new(DEFAULT_CONCURRENCY)),
}
}
pub async fn run(self) {
pub async fn run(self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
let handlers = Arc::new(self.handlers);
let mut tasks = tokio::task::JoinSet::new();
let mut stream = self.consumer.consume();
while let Some(result) = stream.next().await {
match result {
Ok(envelope) => {
tracing::info!(event = ?envelope.event, "received event");
self.dispatch(envelope).await;
}
Err(e) => tracing::error!("event consumer error: {e}"),
}
}
tracing::info!("event stream ended, worker shutting down");
}
async fn dispatch(&self, envelope: EventEnvelope) {
let mut all_ok = true;
for handler in &self.handlers {
if let Err(e) = handler.handle(&envelope.event).await {
tracing::error!("event handler error: {e}");
all_ok = false;
loop {
tokio::select! {
biased;
_ = shutdown.changed() => {
tracing::info!("shutdown signal received, stopping event consumption");
break;
}
item = stream.next() => {
match item {
Some(Ok(envelope)) => {
tracing::info!(event = ?envelope.event, "received event");
let permit = self.semaphore.clone().acquire_owned().await;
let Ok(permit) = permit else { break };
let h = Arc::clone(&handlers);
tasks.spawn(async move {
dispatch(h, envelope).await;
drop(permit);
});
}
Some(Err(e)) => tracing::error!("event consumer error: {e}"),
None => break,
}
}
}
}
let result = if all_ok {
envelope.ack().await
} else {
envelope.nack().await
};
if let Err(e) = result {
tracing::error!("ack/nack failed: {e}");
let in_flight = tasks.len();
if in_flight > 0 {
tracing::info!(in_flight, "draining in-flight tasks before shutdown");
}
while tasks.join_next().await.is_some() {}
tracing::info!("worker shut down gracefully");
}
}
async fn dispatch(handlers: Arc<Vec<Arc<dyn EventHandler>>>, envelope: EventEnvelope) {
for handler in handlers.iter() {
if let Err(e) = handler.handle(&envelope.event).await {
tracing::warn!("event handler error (non-fatal): {e}");
}
}
if let Err(e) = envelope.ack().await {
tracing::error!("ack failed: {e}");
}
}

View File

@@ -125,6 +125,7 @@ fn build_report(
fn movie_ref(r: &WrapUpMovieRow) -> MovieRef {
MovieRef {
movie_id: Some(r.movie_id),
title: r.title.clone(),
year: r.release_year,
runtime_minutes: r.runtime_minutes,
@@ -233,6 +234,7 @@ fn compute_director_stats(rows: &[WrapUpMovieRow]) -> (Vec<PersonStat>, u32) {
let count = ratings.len() as u32;
let avg = ratings.iter().map(|&r| r as f64).sum::<f64>() / ratings.len() as f64;
PersonStat {
person_id: None,
name,
count,
avg_rating: avg,
@@ -249,12 +251,16 @@ fn compute_director_stats(rows: &[WrapUpMovieRow]) -> (Vec<PersonStat>, u32) {
}
fn compute_actor_stats(rows: &[WrapUpMovieRow]) -> (Vec<PersonStat>, u32, Vec<String>) {
use domain::models::{ExternalPersonId, PersonId};
let mut actor_movies: HashMap<String, Vec<u8>> = HashMap::new();
let mut actor_profiles: HashMap<String, Option<String>> = HashMap::new();
let mut actor_tmdb_ids: HashMap<String, i64> = HashMap::new();
for r in rows {
for (i, (name, billing)) in r.cast_names.iter().enumerate() {
for (i, (name, billing, tmdb_id)) in r.cast_names.iter().enumerate() {
if *billing <= 3 {
actor_movies.entry(name.clone()).or_default().push(r.rating);
actor_tmdb_ids.entry(name.clone()).or_insert(*tmdb_id);
if let Some(path) = r.cast_profile_paths.get(i) {
actor_profiles
.entry(name.clone())
@@ -269,7 +275,12 @@ fn compute_actor_stats(rows: &[WrapUpMovieRow]) -> (Vec<PersonStat>, u32, Vec<St
.map(|(name, ratings)| {
let count = ratings.len() as u32;
let avg = ratings.iter().map(|&r| r as f64).sum::<f64>() / ratings.len() as f64;
let person_id = actor_tmdb_ids.get(&name).map(|tid| {
let ext = ExternalPersonId::new(format!("tmdb:{tid}"));
PersonId::from_external(&ext).value()
});
PersonStat {
person_id,
name,
count,
avg_rating: avg,

View File

@@ -26,7 +26,7 @@ fn make_row(title: &str, rating: u8, watched_at: &str) -> WrapUpMovieRow {
original_language: Some("en".to_string()),
genres: vec!["Action".to_string()],
keywords: vec!["heist".to_string()],
cast_names: vec![("Actor A".to_string(), 1)],
cast_names: vec![("Actor A".to_string(), 1, 12345)],
cast_profile_paths: vec![None],
}
}