From bd7dc648c4cf0f925ed31b1bf5f47e198949740f Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 4 Jun 2026 14:43:28 +0200 Subject: [PATCH] feat: search reindex, worker improvements, person IDs, user display names - add admin POST /api/v1/admin/reindex-search endpoint + event-driven handler - backfill persons from movie_cast/movie_crew into persons table - paginate person list_page/backfill_from_credits_batch to cap memory - concurrent worker event dispatch with semaphore (max 8) - graceful worker shutdown (drain in-flight tasks on SIGINT) - always ack events, log handler errors as warnings (no infinite retry) - NATS ack_wait 600s, AtomicBool guard against concurrent reindex - add username/display_name to UserSummaryDto and users list - add person_id to CastMemberDto/CrewMemberDto via get_movie_profile use case - add movie_id to wrapup MovieRef, person_id to wrapup PersonStat - thread tmdb_person_id through wrapup cast pipeline - add is_federated to FeedEntryDto - cap orphaned persons query with LIMIT 500 - add SPA link to classic site footer --- crates/adapters/event-payload/src/lib.rs | 4 + crates/adapters/nats/src/consumer.rs | 1 + crates/adapters/nats/src/subject.rs | 1 + crates/adapters/postgres/src/models.rs | 6 +- crates/adapters/postgres/src/persons.rs | 91 +++++++++- crates/adapters/postgres/src/users.rs | 4 +- crates/adapters/postgres/src/wrapup.rs | 9 +- crates/adapters/sqlite/src/models.rs | 6 +- crates/adapters/sqlite/src/persons.rs | 70 +++++++- crates/adapters/sqlite/src/users.rs | 12 +- crates/adapters/sqlite/src/wrapup.rs | 9 +- .../template-askama/templates/base.html | 2 + crates/api-types/src/diary.rs | 1 + crates/api-types/src/movies.rs | 2 + crates/api-types/src/users.rs | 2 + crates/application/src/lib.rs | 1 + .../src/movies/get_movie_profile.rs | 78 +++++++++ crates/application/src/movies/mod.rs | 3 + .../application/src/movies/reindex_search.rs | 165 ++++++++++++++++++ crates/application/src/tests/worker.rs | 25 +-- crates/application/src/worker.rs | 78 ++++++--- crates/application/src/wrapup/compute.rs | 13 +- .../application/src/wrapup/tests/compute.rs | 2 +- crates/domain/src/events.rs | 1 + crates/domain/src/models/mod.rs | 12 ++ crates/domain/src/models/wrapup.rs | 2 + crates/domain/src/ports.rs | 9 +- crates/domain/src/testing.rs | 10 ++ crates/presentation/src/handlers/api.rs | 132 ++++++++------ crates/presentation/src/mappers/diary.rs | 1 + crates/presentation/src/mappers/users.rs | 5 +- crates/presentation/src/routes.rs | 4 + crates/presentation/src/tests/api_handlers.rs | 7 + crates/presentation/src/tests/extractors.rs | 13 ++ crates/presentation/tests/api_test.rs | 13 ++ crates/worker/src/main.rs | 17 +- 36 files changed, 693 insertions(+), 118 deletions(-) create mode 100644 crates/application/src/movies/get_movie_profile.rs create mode 100644 crates/application/src/movies/reindex_search.rs diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs index 248e1db..8d394c2 100644 --- a/crates/adapters/event-payload/src/lib.rs +++ b/crates/adapters/event-payload/src/lib.rs @@ -86,6 +86,7 @@ pub enum EventPayload { WrapUpCompleted { wrapup_id: String, }, + SearchReindexRequested, } impl EventPayload { @@ -107,6 +108,7 @@ impl EventPayload { EventPayload::WatchEventIngested { .. } => "WatchEventIngested", EventPayload::WrapUpRequested { .. } => "WrapUpRequested", EventPayload::WrapUpCompleted { .. } => "WrapUpCompleted", + EventPayload::SearchReindexRequested => "SearchReindexRequested", } } } @@ -248,6 +250,7 @@ impl From<&DomainEvent> for EventPayload { DomainEvent::WrapUpCompleted { wrapup_id } => EventPayload::WrapUpCompleted { wrapup_id: wrapup_id.value().to_string(), }, + DomainEvent::SearchReindexRequested => EventPayload::SearchReindexRequested, } } } @@ -398,6 +401,7 @@ impl TryFrom for DomainEvent { wrapup_id: WrapUpId::from_uuid(wid), }) } + EventPayload::SearchReindexRequested => Ok(DomainEvent::SearchReindexRequested), } } } diff --git a/crates/adapters/nats/src/consumer.rs b/crates/adapters/nats/src/consumer.rs index caca808..33c4de0 100644 --- a/crates/adapters/nats/src/consumer.rs +++ b/crates/adapters/nats/src/consumer.rs @@ -120,6 +120,7 @@ impl NatsJetStreamConsumer { pull::Config { durable_name: Some(cfg.consumer_name.clone()), filter_subject: subject_filter, + ack_wait: std::time::Duration::from_secs(600), ..Default::default() }, ) diff --git a/crates/adapters/nats/src/subject.rs b/crates/adapters/nats/src/subject.rs index a180920..1f1f127 100644 --- a/crates/adapters/nats/src/subject.rs +++ b/crates/adapters/nats/src/subject.rs @@ -18,6 +18,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String { DomainEvent::WatchEventIngested { .. } => "watch.event.ingested", DomainEvent::WrapUpRequested { .. } => "wrapup.requested", DomainEvent::WrapUpCompleted { .. } => "wrapup.completed", + DomainEvent::SearchReindexRequested => "search.reindex.requested", }; format!("{prefix}.{suffix}") } diff --git a/crates/adapters/postgres/src/models.rs b/crates/adapters/postgres/src/models.rs index 2c1a6dd..373e076 100644 --- a/crates/adapters/postgres/src/models.rs +++ b/crates/adapters/postgres/src/models.rs @@ -7,7 +7,7 @@ use domain::{ }, value_objects::{ Comment, Email, ExternalMetadataId, MovieId, MovieTitle, PosterPath, Rating, ReleaseYear, - ReviewId, UserId, + ReviewId, UserId, Username, }, }; use uuid::Uuid; @@ -237,6 +237,8 @@ impl MovieStatsRow { pub(crate) struct UserSummaryRow { pub id: String, pub email: String, + pub username: String, + pub display_name: Option, pub total_movies: i64, pub avg_rating: Option, pub avatar_path: Option, @@ -247,6 +249,8 @@ impl UserSummaryRow { Ok(UserSummary::new( UserId::from_uuid(parse_uuid(&self.id)?), Email::new(self.email)?, + Username::new(self.username)?, + self.display_name, self.total_movies, self.avg_rating, self.avatar_path, diff --git a/crates/adapters/postgres/src/persons.rs b/crates/adapters/postgres/src/persons.rs index d87b3f6..978900e 100644 --- a/crates/adapters/postgres/src/persons.rs +++ b/crates/adapters/postgres/src/persons.rs @@ -57,6 +57,60 @@ impl PersonCommand for PostgresPersonAdapter { } Ok(()) } + + async fn backfill_from_credits_batch( + &self, + batch_size: u32, + ) -> Result<(u64, bool), DomainError> { + #[derive(sqlx::FromRow)] + struct MissingPerson { + tmdb_person_id: i64, + name: String, + department: Option, + profile_path: Option, + } + + let rows = sqlx::query_as::<_, MissingPerson>( + "SELECT mc.tmdb_person_id, mc.name, 'Acting' AS department, mc.profile_path + FROM movie_cast mc + WHERE NOT EXISTS (SELECT 1 FROM persons WHERE persons.tmdb_person_id = mc.tmdb_person_id) + GROUP BY mc.tmdb_person_id, mc.name, mc.profile_path + UNION ALL + SELECT mc.tmdb_person_id, mc.name, mc.department, mc.profile_path + FROM movie_crew mc + WHERE NOT EXISTS (SELECT 1 FROM persons WHERE persons.tmdb_person_id = mc.tmdb_person_id) + AND NOT EXISTS (SELECT 1 FROM movie_cast c2 WHERE c2.tmdb_person_id = mc.tmdb_person_id) + GROUP BY mc.tmdb_person_id, mc.name, mc.department, mc.profile_path + LIMIT $1", + ) + .bind(batch_size as i64) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + let has_more = rows.len() as u32 >= batch_size; + let mut count = 0u64; + for row in &rows { + let ext = ExternalPersonId::new(format!("tmdb:{}", row.tmdb_person_id)); + let pid = PersonId::from_external(&ext); + sqlx::query( + "INSERT INTO persons (id, external_id, tmdb_person_id, name, known_for_department, profile_path) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT(tmdb_person_id) DO NOTHING", + ) + .bind(pid.value().to_string()) + .bind(ext.value()) + .bind(row.tmdb_person_id) + .bind(&row.name) + .bind(&row.department) + .bind(&row.profile_path) + .execute(&self.pool) + .await + .map_err(map_err)?; + count += 1; + } + Ok((count, has_more)) + } } #[async_trait] @@ -206,6 +260,40 @@ impl PersonQuery for PostgresPersonAdapter { Ok(PersonCredits { person, cast, crew }) } + async fn list_page(&self, limit: u32, offset: u32) -> Result, DomainError> { + #[derive(sqlx::FromRow)] + struct Row { + id: String, + external_id: String, + name: String, + known_for_department: Option, + profile_path: Option, + } + + let rows = sqlx::query_as::<_, Row>( + "SELECT id, external_id, name, known_for_department, profile_path FROM persons ORDER BY id LIMIT $1 OFFSET $2", + ) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + Ok(rows + .into_iter() + .map(|r| { + let ext = ExternalPersonId::new(r.external_id); + Person::new( + PersonId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()), + ext, + r.name, + r.known_for_department, + r.profile_path, + ) + }) + .collect()) + } + async fn list_orphaned_persons(&self) -> Result, DomainError> { let rows: Vec<(String,)> = sqlx::query_as( "SELECT id FROM persons @@ -214,7 +302,8 @@ impl PersonQuery for PostgresPersonAdapter { ) AND NOT EXISTS ( SELECT 1 FROM movie_crew WHERE movie_crew.tmdb_person_id = persons.tmdb_person_id - )", + ) + LIMIT 500", ) .fetch_all(&self.pool) .await diff --git a/crates/adapters/postgres/src/users.rs b/crates/adapters/postgres/src/users.rs index 8267426..e496254 100644 --- a/crates/adapters/postgres/src/users.rs +++ b/crates/adapters/postgres/src/users.rs @@ -186,13 +186,13 @@ impl UserRepository for PostgresUserRepository { async fn list_with_stats(&self) -> Result, DomainError> { sqlx::query_as::<_, UserSummaryRow>( - r#"SELECT u.id, u.email, + r#"SELECT u.id, u.email, u.username, u.display_name, COUNT(DISTINCT r.movie_id) AS total_movies, AVG(r.rating::float) AS avg_rating, u.avatar_path FROM users u LEFT JOIN reviews r ON r.user_id = u.id AND r.remote_actor_url IS NULL - GROUP BY u.id, u.email, u.avatar_path + GROUP BY u.id, u.email, u.username, u.display_name, u.avatar_path ORDER BY u.email ASC"#, ) .fetch_all(&self.pool) diff --git a/crates/adapters/postgres/src/wrapup.rs b/crates/adapters/postgres/src/wrapup.rs index 8815ef9..ff41f17 100644 --- a/crates/adapters/postgres/src/wrapup.rs +++ b/crates/adapters/postgres/src/wrapup.rs @@ -333,9 +333,9 @@ impl WrapUpStatsQuery for PostgresWrapUpStatsQuery { let keywords = keywords_map.get(&movie_id_str).cloned().unwrap_or_default(); let cast = cast_map.get(&movie_id_str).cloned().unwrap_or_default(); - let cast_names: Vec<(String, u32)> = cast + let cast_names: Vec<(String, u32, i64)> = cast .iter() - .map(|c| (c.name.clone(), c.billing_order)) + .map(|c| (c.name.clone(), c.billing_order, c.tmdb_person_id)) .collect(); let cast_profile_paths: Vec> = cast.iter().map(|c| c.profile_path.clone()).collect(); @@ -367,6 +367,7 @@ impl WrapUpStatsQuery for PostgresWrapUpStatsQuery { struct CastEntry { name: String, billing_order: u32, + tmdb_person_id: i64, profile_path: Option, } @@ -417,7 +418,7 @@ async fn fetch_cast_pg( movie_ids: &[String], ) -> Result>, DomainError> { let rows = sqlx::query( - "SELECT movie_id, name, billing_order, profile_path \ + "SELECT movie_id, name, billing_order, tmdb_person_id, profile_path \ FROM movie_cast \ WHERE movie_id = ANY($1) AND billing_order <= 3 \ ORDER BY billing_order ASC", @@ -432,10 +433,12 @@ async fn fetch_cast_pg( let mid: String = row.try_get("movie_id").map_err(map_err)?; let name: String = row.try_get("name").map_err(map_err)?; let billing_order: i32 = row.try_get("billing_order").map_err(map_err)?; + let tmdb_person_id: i64 = row.try_get("tmdb_person_id").map_err(map_err)?; let profile_path: Option = row.try_get("profile_path").map_err(map_err)?; map.entry(mid).or_default().push(CastEntry { name, billing_order: billing_order as u32, + tmdb_person_id, profile_path, }); } diff --git a/crates/adapters/sqlite/src/models.rs b/crates/adapters/sqlite/src/models.rs index 3212f19..95df579 100644 --- a/crates/adapters/sqlite/src/models.rs +++ b/crates/adapters/sqlite/src/models.rs @@ -7,7 +7,7 @@ use domain::{ }, value_objects::{ Comment, Email, ExternalMetadataId, MovieId, MovieTitle, PosterPath, Rating, ReleaseYear, - ReviewId, UserId, WatchlistEntryId, + ReviewId, UserId, Username, WatchlistEntryId, }, }; use uuid::Uuid; @@ -245,6 +245,8 @@ impl FeedRow { pub(crate) struct UserSummaryRow { pub id: String, pub email: String, + pub username: String, + pub display_name: Option, pub total_movies: i64, pub avg_rating: Option, pub avatar_path: Option, @@ -255,6 +257,8 @@ impl UserSummaryRow { Ok(UserSummary::new( UserId::from_uuid(parse_uuid(&self.id)?), Email::new(self.email)?, + Username::new(self.username)?, + self.display_name, self.total_movies, self.avg_rating, self.avatar_path, diff --git a/crates/adapters/sqlite/src/persons.rs b/crates/adapters/sqlite/src/persons.rs index 2ec373b..04d6be5 100644 --- a/crates/adapters/sqlite/src/persons.rs +++ b/crates/adapters/sqlite/src/persons.rs @@ -57,6 +57,60 @@ impl PersonCommand for SqlitePersonAdapter { } Ok(()) } + + async fn backfill_from_credits_batch( + &self, + batch_size: u32, + ) -> Result<(u64, bool), DomainError> { + #[derive(sqlx::FromRow)] + struct MissingPerson { + tmdb_person_id: i64, + name: String, + department: Option, + profile_path: Option, + } + + let rows = sqlx::query_as::<_, MissingPerson>( + "SELECT mc.tmdb_person_id, mc.name, 'Acting' AS department, mc.profile_path + FROM movie_cast mc + WHERE NOT EXISTS (SELECT 1 FROM persons WHERE persons.tmdb_person_id = mc.tmdb_person_id) + GROUP BY mc.tmdb_person_id + UNION ALL + SELECT mc.tmdb_person_id, mc.name, mc.department, mc.profile_path + FROM movie_crew mc + WHERE NOT EXISTS (SELECT 1 FROM persons WHERE persons.tmdb_person_id = mc.tmdb_person_id) + AND NOT EXISTS (SELECT 1 FROM movie_cast c2 WHERE c2.tmdb_person_id = mc.tmdb_person_id) + GROUP BY mc.tmdb_person_id + LIMIT ?", + ) + .bind(batch_size) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + let has_more = rows.len() as u32 >= batch_size; + let mut count = 0u64; + for row in &rows { + let ext = ExternalPersonId::new(format!("tmdb:{}", row.tmdb_person_id)); + let pid = PersonId::from_external(&ext); + sqlx::query( + "INSERT INTO persons (id, external_id, tmdb_person_id, name, known_for_department, profile_path) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(tmdb_person_id) DO NOTHING", + ) + .bind(pid.value().to_string()) + .bind(ext.value()) + .bind(row.tmdb_person_id) + .bind(&row.name) + .bind(&row.department) + .bind(&row.profile_path) + .execute(&self.pool) + .await + .map_err(map_err)?; + count += 1; + } + Ok((count, has_more)) + } } #[async_trait] @@ -156,6 +210,19 @@ impl PersonQuery for SqlitePersonAdapter { Ok(PersonCredits { person, cast, crew }) } + async fn list_page(&self, limit: u32, offset: u32) -> Result, DomainError> { + let rows = sqlx::query_as::<_, PersonRow>( + "SELECT id, external_id, name, known_for_department, profile_path FROM persons ORDER BY id LIMIT ? OFFSET ?", + ) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + Ok(rows.into_iter().map(PersonRow::into_person).collect()) + } + async fn list_orphaned_persons(&self) -> Result, DomainError> { let rows: Vec<(String,)> = sqlx::query_as( "SELECT id FROM persons @@ -164,7 +231,8 @@ impl PersonQuery for SqlitePersonAdapter { ) AND NOT EXISTS ( SELECT 1 FROM movie_crew WHERE movie_crew.tmdb_person_id = persons.tmdb_person_id - )", + ) + LIMIT 500", ) .fetch_all(&self.pool) .await diff --git a/crates/adapters/sqlite/src/users.rs b/crates/adapters/sqlite/src/users.rs index cbc5527..74f48ea 100644 --- a/crates/adapters/sqlite/src/users.rs +++ b/crates/adapters/sqlite/src/users.rs @@ -182,17 +182,15 @@ impl UserRepository for SqliteUserRepository { } async fn list_with_stats(&self) -> Result, DomainError> { - sqlx::query_as!( - UserSummaryRow, - r#"SELECT u.id AS "id!: String", - u.email AS "email!: String", - COUNT(DISTINCT r.movie_id) AS "total_movies!: i64", + sqlx::query_as::<_, UserSummaryRow>( + r#"SELECT u.id, u.email, u.username, u.display_name, + COUNT(DISTINCT r.movie_id) AS total_movies, AVG(CAST(r.rating AS REAL)) AS avg_rating, u.avatar_path FROM users u LEFT JOIN reviews r ON r.user_id = u.id AND r.remote_actor_url IS NULL - GROUP BY u.id, u.email, u.avatar_path - ORDER BY u.email ASC"# + GROUP BY u.id, u.email, u.username, u.display_name, u.avatar_path + ORDER BY u.email ASC"#, ) .fetch_all(&self.pool) .await diff --git a/crates/adapters/sqlite/src/wrapup.rs b/crates/adapters/sqlite/src/wrapup.rs index 84fa840..eb3d675 100644 --- a/crates/adapters/sqlite/src/wrapup.rs +++ b/crates/adapters/sqlite/src/wrapup.rs @@ -345,9 +345,9 @@ impl WrapUpStatsQuery for SqliteWrapUpStatsQuery { let keywords = keywords_map.get(&movie_id_str).cloned().unwrap_or_default(); let cast = cast_map.get(&movie_id_str).cloned().unwrap_or_default(); - let cast_names: Vec<(String, u32)> = cast + let cast_names: Vec<(String, u32, i64)> = cast .iter() - .map(|c| (c.name.clone(), c.billing_order)) + .map(|c| (c.name.clone(), c.billing_order, c.tmdb_person_id)) .collect(); let cast_profile_paths: Vec> = cast.iter().map(|c| c.profile_path.clone()).collect(); @@ -379,6 +379,7 @@ impl WrapUpStatsQuery for SqliteWrapUpStatsQuery { struct CastEntry { name: String, billing_order: u32, + tmdb_person_id: i64, profile_path: Option, } @@ -453,7 +454,7 @@ async fn fetch_cast_sqlite( return Ok(HashMap::new()); } let sql = format!( - "SELECT movie_id, name, billing_order, profile_path \ + "SELECT movie_id, name, billing_order, tmdb_person_id, profile_path \ FROM movie_cast \ WHERE movie_id IN ({}) AND billing_order <= 3 \ ORDER BY billing_order ASC", @@ -470,10 +471,12 @@ async fn fetch_cast_sqlite( let mid: String = row.try_get("movie_id").map_err(map_err)?; let name: String = row.try_get("name").map_err(map_err)?; let billing_order: i32 = row.try_get("billing_order").map_err(map_err)?; + let tmdb_person_id: i64 = row.try_get("tmdb_person_id").map_err(map_err)?; let profile_path: Option = row.try_get("profile_path").map_err(map_err)?; map.entry(mid).or_default().push(CastEntry { name, billing_order: billing_order as u32, + tmdb_person_id, profile_path, }); } diff --git a/crates/adapters/template-askama/templates/base.html b/crates/adapters/template-askama/templates/base.html index 0b33b69..a71f8eb 100644 --- a/crates/adapters/template-askama/templates/base.html +++ b/crates/adapters/template-askama/templates/base.html @@ -63,6 +63,8 @@ {% endif %} · API Docs + · + Mobile App diff --git a/crates/api-types/src/diary.rs b/crates/api-types/src/diary.rs index ba88558..bc5d86d 100644 --- a/crates/api-types/src/diary.rs +++ b/crates/api-types/src/diary.rs @@ -57,6 +57,7 @@ pub struct FeedEntryDto { pub user_id: Uuid, pub user_email: String, pub user_display_name: String, + pub is_federated: bool, } #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] diff --git a/crates/api-types/src/movies.rs b/crates/api-types/src/movies.rs index 7e8f82c..36ca244 100644 --- a/crates/api-types/src/movies.rs +++ b/crates/api-types/src/movies.rs @@ -40,6 +40,7 @@ pub struct KeywordDto { #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] pub struct CastMemberDto { + pub person_id: String, pub tmdb_person_id: u64, pub name: String, pub character: String, @@ -49,6 +50,7 @@ pub struct CastMemberDto { #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] pub struct CrewMemberDto { + pub person_id: String, pub tmdb_person_id: u64, pub name: String, pub job: String, diff --git a/crates/api-types/src/users.rs b/crates/api-types/src/users.rs index 654ebfe..13ab95a 100644 --- a/crates/api-types/src/users.rs +++ b/crates/api-types/src/users.rs @@ -7,6 +7,8 @@ use crate::diary::{DiaryEntryDto, DiaryResponse}; pub struct UserSummaryDto { pub id: Uuid, pub email: String, + pub username: String, + pub display_name: Option, pub total_movies: i64, pub avg_rating: Option, } diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index c25f4c8..35aa179 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -20,3 +20,4 @@ pub mod test_helpers; pub use movies::MovieDiscoveryIndexer; pub use movies::SearchCleanupHandler; +pub use movies::SearchReindexHandler; diff --git a/crates/application/src/movies/get_movie_profile.rs b/crates/application/src/movies/get_movie_profile.rs new file mode 100644 index 0000000..2ddf828 --- /dev/null +++ b/crates/application/src/movies/get_movie_profile.rs @@ -0,0 +1,78 @@ +use domain::{ + errors::DomainError, + models::{CastMember, CrewMember, ExternalPersonId, MovieProfile, PersonId}, + value_objects::MovieId, +}; +use uuid::Uuid; + +use crate::context::AppContext; + +pub struct GetMovieProfileQuery { + pub movie_id: Uuid, +} + +pub struct CastMemberWithId { + pub person_id: PersonId, + pub tmdb_person_id: u64, + pub name: String, + pub character: String, + pub billing_order: u32, + pub profile_path: Option, +} + +pub struct CrewMemberWithId { + pub person_id: PersonId, + pub tmdb_person_id: u64, + pub name: String, + pub job: String, + pub department: String, + pub profile_path: Option, +} + +pub struct MovieProfileResult { + pub profile: MovieProfile, + pub cast: Vec, + pub crew: Vec, +} + +fn resolve_cast(member: &CastMember) -> CastMemberWithId { + let ext = ExternalPersonId::new(format!("tmdb:{}", member.tmdb_person_id)); + CastMemberWithId { + person_id: PersonId::from_external(&ext), + tmdb_person_id: member.tmdb_person_id, + name: member.name.clone(), + character: member.character.clone(), + billing_order: member.billing_order, + profile_path: member.profile_path.clone(), + } +} + +fn resolve_crew(member: &CrewMember) -> CrewMemberWithId { + let ext = ExternalPersonId::new(format!("tmdb:{}", member.tmdb_person_id)); + CrewMemberWithId { + person_id: PersonId::from_external(&ext), + tmdb_person_id: member.tmdb_person_id, + name: member.name.clone(), + job: member.job.clone(), + department: member.department.clone(), + profile_path: member.profile_path.clone(), + } +} + +pub async fn execute( + ctx: &AppContext, + query: GetMovieProfileQuery, +) -> Result, DomainError> { + let movie_id = MovieId::from_uuid(query.movie_id); + let profile = ctx.repos.movie_profile.get_by_movie_id(&movie_id).await?; + + Ok(profile.map(|p| { + let cast = p.cast.iter().map(resolve_cast).collect(); + let crew = p.crew.iter().map(resolve_crew).collect(); + MovieProfileResult { + profile: p, + cast, + crew, + } + })) +} diff --git a/crates/application/src/movies/mod.rs b/crates/application/src/movies/mod.rs index 225dbac..94a1fc3 100644 --- a/crates/application/src/movies/mod.rs +++ b/crates/application/src/movies/mod.rs @@ -1,10 +1,13 @@ pub mod commands; pub mod discovery_indexer; pub mod enrich_movie; +pub mod get_movie_profile; pub mod get_movies; pub mod queries; +pub mod reindex_search; pub mod search_cleanup; pub mod sync_poster; pub use discovery_indexer::MovieDiscoveryIndexer; +pub use reindex_search::SearchReindexHandler; pub use search_cleanup::SearchCleanupHandler; diff --git a/crates/application/src/movies/reindex_search.rs b/crates/application/src/movies/reindex_search.rs new file mode 100644 index 0000000..0447079 --- /dev/null +++ b/crates/application/src/movies/reindex_search.rs @@ -0,0 +1,165 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::DomainEvent, + models::{IndexableDocument, MovieFilter, collections::PageParams}, + ports::EventHandler, +}; +use std::sync::atomic::{AtomicBool, Ordering}; + +use crate::context::AppContext; + +const BATCH_SIZE: u32 = 500; + +pub struct SearchReindexHandler { + ctx: AppContext, + running: AtomicBool, +} + +impl SearchReindexHandler { + pub fn new(ctx: AppContext) -> Self { + Self { + ctx, + running: AtomicBool::new(false), + } + } +} + +#[async_trait] +impl EventHandler for SearchReindexHandler { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + if !matches!(event, DomainEvent::SearchReindexRequested) { + return Ok(()); + } + + if self.running.swap(true, Ordering::SeqCst) { + tracing::info!("search reindex already running, skipping"); + return Ok(()); + } + + let result = self.run_reindex().await; + self.running.store(false, Ordering::SeqCst); + result + } +} + +impl SearchReindexHandler { + async fn run_reindex(&self) -> Result<(), DomainError> { + tracing::info!("search reindex started"); + + let movies_indexed = self.reindex_movies().await?; + let backfilled = self.backfill_persons().await?; + if backfilled > 0 { + tracing::info!(backfilled, "backfilled missing persons from credits"); + } + let persons_indexed = self.reindex_persons().await?; + + tracing::info!(movies_indexed, persons_indexed, "search reindex completed"); + Ok(()) + } + + async fn reindex_movies(&self) -> Result { + let mut count: u64 = 0; + let mut offset: u32 = 0; + loop { + let page = self + .ctx + .repos + .movie + .list_movies( + &PageParams { + limit: BATCH_SIZE, + offset, + }, + &MovieFilter::default(), + ) + .await?; + + for summary in &page.items { + let movie_id = summary.movie.id().clone(); + let profile = self + .ctx + .repos + .movie_profile + .get_by_movie_id(&movie_id) + .await?; + + if let Err(e) = self + .ctx + .repos + .search_command + .index(IndexableDocument::Movie { + id: movie_id.clone(), + movie: Box::new(summary.movie.clone()), + profile: profile.map(Box::new), + }) + .await + { + tracing::warn!(movie_id = %movie_id.value(), "reindex movie failed: {e}"); + } + count += 1; + } + + if (page.items.len() as u32) < BATCH_SIZE { + break; + } + offset += BATCH_SIZE; + tokio::task::yield_now().await; + } + Ok(count) + } + + async fn backfill_persons(&self) -> Result { + let mut total = 0u64; + loop { + let (count, has_more) = self + .ctx + .repos + .person_command + .backfill_from_credits_batch(BATCH_SIZE) + .await?; + total += count; + if !has_more { + break; + } + tokio::task::yield_now().await; + } + Ok(total) + } + + async fn reindex_persons(&self) -> Result { + let mut count: u64 = 0; + let mut offset: u32 = 0; + loop { + let persons = self + .ctx + .repos + .person_query + .list_page(BATCH_SIZE, offset) + .await?; + + for person in &persons { + if let Err(e) = self + .ctx + .repos + .search_command + .index(IndexableDocument::Person { + id: person.id().clone(), + person: Box::new(person.clone()), + }) + .await + { + tracing::warn!(person = %person.name(), "reindex person failed: {e}"); + } + count += 1; + } + + if (persons.len() as u32) < BATCH_SIZE { + break; + } + offset += BATCH_SIZE; + tokio::task::yield_now().await; + } + Ok(count) + } +} diff --git a/crates/application/src/tests/worker.rs b/crates/application/src/tests/worker.rs index e0f4bca..65f8300 100644 --- a/crates/application/src/tests/worker.rs +++ b/crates/application/src/tests/worker.rs @@ -61,6 +61,7 @@ impl EventHandler for RecordingHandler { DomainEvent::WatchEventIngested { .. } => "watch_event_ingested", DomainEvent::WrapUpRequested { .. } => "wrapup_requested", DomainEvent::WrapUpCompleted { .. } => "wrapup_completed", + DomainEvent::SearchReindexRequested => "search_reindex", }; self.calls.lock().unwrap().push(label); Ok(()) @@ -85,34 +86,34 @@ async fn dispatches_to_all_handlers() { }; WorkerService::new(Arc::new(consumer), vec![Arc::new(handler)]) - .run() + .run(tokio::sync::watch::channel(false).1) .await; assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]); } #[tokio::test] -async fn nacks_when_handler_fails() { - let nack_called = Arc::new(Mutex::new(false)); +async fn acks_even_when_handler_fails() { + let ack_called = Arc::new(Mutex::new(false)); struct TrackingAck { - nack_called: Arc>, + ack_called: Arc>, } #[async_trait] impl AckHandle for TrackingAck { async fn ack(&self) -> Result<(), DomainError> { + *self.ack_called.lock().unwrap() = true; Ok(()) } async fn nack(&self) -> Result<(), DomainError> { - *self.nack_called.lock().unwrap() = true; Ok(()) } } struct TrackingConsumer { event: DomainEvent, - nack_called: Arc>, + ack_called: Arc>, } impl EventConsumer for TrackingConsumer { @@ -120,7 +121,7 @@ async fn nacks_when_handler_fails() { let envelope = EventEnvelope::new( self.event.clone(), Box::new(TrackingAck { - nack_called: Arc::clone(&self.nack_called), + ack_called: Arc::clone(&self.ack_called), }), ); Box::pin(stream::iter(vec![Ok(envelope)])) @@ -138,14 +139,14 @@ async fn nacks_when_handler_fails() { let consumer = TrackingConsumer { event: movie_discovered(), - nack_called: Arc::clone(&nack_called), + ack_called: Arc::clone(&ack_called), }; WorkerService::new(Arc::new(consumer), vec![Arc::new(FailingHandler)]) - .run() + .run(tokio::sync::watch::channel(false).1) .await; - assert!(*nack_called.lock().unwrap()); + assert!(*ack_called.lock().unwrap()); } #[tokio::test] @@ -189,7 +190,9 @@ async fn acks_when_all_handlers_succeed() { ack_called: Arc::clone(&ack_called), }; - WorkerService::new(Arc::new(consumer), vec![]).run().await; + WorkerService::new(Arc::new(consumer), vec![]) + .run(tokio::sync::watch::channel(false).1) + .await; assert!(*ack_called.lock().unwrap()); } diff --git a/crates/application/src/worker.rs b/crates/application/src/worker.rs index a051a30..73602a3 100644 --- a/crates/application/src/worker.rs +++ b/crates/application/src/worker.rs @@ -5,47 +5,73 @@ use domain::{ ports::{EventConsumer, EventHandler}, }; use futures::StreamExt; +use tokio::sync::Semaphore; + +const DEFAULT_CONCURRENCY: usize = 8; pub struct WorkerService { consumer: Arc, handlers: Vec>, + semaphore: Arc, } impl WorkerService { pub fn new(consumer: Arc, handlers: Vec>) -> Self { - Self { consumer, handlers } + Self { + consumer, + handlers, + semaphore: Arc::new(Semaphore::new(DEFAULT_CONCURRENCY)), + } } - pub async fn run(self) { + pub async fn run(self, mut shutdown: tokio::sync::watch::Receiver) { + let handlers = Arc::new(self.handlers); + let mut tasks = tokio::task::JoinSet::new(); let mut stream = self.consumer.consume(); - while let Some(result) = stream.next().await { - match result { - Ok(envelope) => { - tracing::info!(event = ?envelope.event, "received event"); - self.dispatch(envelope).await; - } - Err(e) => tracing::error!("event consumer error: {e}"), - } - } - tracing::info!("event stream ended, worker shutting down"); - } - async fn dispatch(&self, envelope: EventEnvelope) { - let mut all_ok = true; - for handler in &self.handlers { - if let Err(e) = handler.handle(&envelope.event).await { - tracing::error!("event handler error: {e}"); - all_ok = false; + loop { + tokio::select! { + biased; + _ = shutdown.changed() => { + tracing::info!("shutdown signal received, stopping event consumption"); + break; + } + item = stream.next() => { + match item { + Some(Ok(envelope)) => { + tracing::info!(event = ?envelope.event, "received event"); + let permit = self.semaphore.clone().acquire_owned().await; + let Ok(permit) = permit else { break }; + let h = Arc::clone(&handlers); + tasks.spawn(async move { + dispatch(h, envelope).await; + drop(permit); + }); + } + Some(Err(e)) => tracing::error!("event consumer error: {e}"), + None => break, + } + } } } - let result = if all_ok { - envelope.ack().await - } else { - envelope.nack().await - }; - if let Err(e) = result { - tracing::error!("ack/nack failed: {e}"); + + let in_flight = tasks.len(); + if in_flight > 0 { + tracing::info!(in_flight, "draining in-flight tasks before shutdown"); } + while tasks.join_next().await.is_some() {} + tracing::info!("worker shut down gracefully"); + } +} + +async fn dispatch(handlers: Arc>>, envelope: EventEnvelope) { + for handler in handlers.iter() { + if let Err(e) = handler.handle(&envelope.event).await { + tracing::warn!("event handler error (non-fatal): {e}"); + } + } + if let Err(e) = envelope.ack().await { + tracing::error!("ack failed: {e}"); } } diff --git a/crates/application/src/wrapup/compute.rs b/crates/application/src/wrapup/compute.rs index 86555a1..7612986 100644 --- a/crates/application/src/wrapup/compute.rs +++ b/crates/application/src/wrapup/compute.rs @@ -125,6 +125,7 @@ fn build_report( fn movie_ref(r: &WrapUpMovieRow) -> MovieRef { MovieRef { + movie_id: Some(r.movie_id), title: r.title.clone(), year: r.release_year, runtime_minutes: r.runtime_minutes, @@ -233,6 +234,7 @@ fn compute_director_stats(rows: &[WrapUpMovieRow]) -> (Vec, u32) { let count = ratings.len() as u32; let avg = ratings.iter().map(|&r| r as f64).sum::() / ratings.len() as f64; PersonStat { + person_id: None, name, count, avg_rating: avg, @@ -249,12 +251,16 @@ fn compute_director_stats(rows: &[WrapUpMovieRow]) -> (Vec, u32) { } fn compute_actor_stats(rows: &[WrapUpMovieRow]) -> (Vec, u32, Vec) { + use domain::models::{ExternalPersonId, PersonId}; + let mut actor_movies: HashMap> = HashMap::new(); let mut actor_profiles: HashMap> = HashMap::new(); + let mut actor_tmdb_ids: HashMap = HashMap::new(); for r in rows { - for (i, (name, billing)) in r.cast_names.iter().enumerate() { + for (i, (name, billing, tmdb_id)) in r.cast_names.iter().enumerate() { if *billing <= 3 { actor_movies.entry(name.clone()).or_default().push(r.rating); + actor_tmdb_ids.entry(name.clone()).or_insert(*tmdb_id); if let Some(path) = r.cast_profile_paths.get(i) { actor_profiles .entry(name.clone()) @@ -269,7 +275,12 @@ fn compute_actor_stats(rows: &[WrapUpMovieRow]) -> (Vec, u32, Vec() / ratings.len() as f64; + let person_id = actor_tmdb_ids.get(&name).map(|tid| { + let ext = ExternalPersonId::new(format!("tmdb:{tid}")); + PersonId::from_external(&ext).value() + }); PersonStat { + person_id, name, count, avg_rating: avg, diff --git a/crates/application/src/wrapup/tests/compute.rs b/crates/application/src/wrapup/tests/compute.rs index fd0cf71..57c98b0 100644 --- a/crates/application/src/wrapup/tests/compute.rs +++ b/crates/application/src/wrapup/tests/compute.rs @@ -26,7 +26,7 @@ fn make_row(title: &str, rating: u8, watched_at: &str) -> WrapUpMovieRow { original_language: Some("en".to_string()), genres: vec!["Action".to_string()], keywords: vec!["heist".to_string()], - cast_names: vec![("Actor A".to_string(), 1)], + cast_names: vec![("Actor A".to_string(), 1, 12345)], cast_profile_paths: vec![None], } } diff --git a/crates/domain/src/events.rs b/crates/domain/src/events.rs index ab7a96e..9599782 100644 --- a/crates/domain/src/events.rs +++ b/crates/domain/src/events.rs @@ -84,6 +84,7 @@ pub enum DomainEvent { WrapUpCompleted { wrapup_id: WrapUpId, }, + SearchReindexRequested, } #[async_trait] diff --git a/crates/domain/src/models/mod.rs b/crates/domain/src/models/mod.rs index 62da1cf..ce07c0f 100644 --- a/crates/domain/src/models/mod.rs +++ b/crates/domain/src/models/mod.rs @@ -461,6 +461,8 @@ impl FeedEntry { pub struct UserSummary { pub user_id: UserId, email: Email, + username: Username, + display_name: Option, pub total_movies: i64, pub avg_rating: Option, pub avatar_path: Option, @@ -470,6 +472,8 @@ impl UserSummary { pub fn new( user_id: UserId, email: Email, + username: Username, + display_name: Option, total_movies: i64, avg_rating: Option, avatar_path: Option, @@ -477,6 +481,8 @@ impl UserSummary { Self { user_id, email, + username, + display_name, total_movies, avg_rating, avatar_path, @@ -485,6 +491,12 @@ impl UserSummary { pub fn email(&self) -> &str { self.email.value() } + pub fn username(&self) -> &str { + self.username.value() + } + pub fn display_name(&self) -> Option<&str> { + self.display_name.as_deref() + } } #[derive(Clone, Debug)] diff --git a/crates/domain/src/models/wrapup.rs b/crates/domain/src/models/wrapup.rs index 6619bb2..8831aad 100644 --- a/crates/domain/src/models/wrapup.rs +++ b/crates/domain/src/models/wrapup.rs @@ -36,6 +36,7 @@ impl DateRange { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct MovieRef { + pub movie_id: Option, pub title: String, pub year: u16, pub runtime_minutes: Option, @@ -50,6 +51,7 @@ pub struct UserRef { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct PersonStat { + pub person_id: Option, pub name: String, pub count: u32, pub avg_rating: f64, diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index 898c18c..b5db5ba 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -332,6 +332,12 @@ pub trait ImageRefQuery: Send + Sync { pub trait PersonCommand: Send + Sync { /// Upsert a batch of persons. Uses INSERT OR REPLACE (SQLite) / ON CONFLICT DO UPDATE (Postgres). async fn upsert_batch(&self, persons: &[Person]) -> Result<(), DomainError>; + /// Insert a batch of missing persons from movie_cast/movie_crew into the persons table. + /// Returns (inserted_count, has_more). + async fn backfill_from_credits_batch( + &self, + batch_size: u32, + ) -> Result<(u64, bool), DomainError>; } /// Read port — queries persons and credits. No mutations. @@ -347,6 +353,7 @@ pub trait PersonQuery: Send + Sync { /// Returns persons who have no remaining entries in movie_cast or movie_crew. /// Called after movie deletion to find index entries that can be pruned. async fn list_orphaned_persons(&self) -> Result, DomainError>; + async fn list_page(&self, limit: u32, offset: u32) -> Result, DomainError>; } /// Read port — executes search queries. No mutations. @@ -519,7 +526,7 @@ pub struct WrapUpMovieRow { pub original_language: Option, pub genres: Vec, pub keywords: Vec, - pub cast_names: Vec<(String, u32)>, + pub cast_names: Vec<(String, u32, i64)>, pub cast_profile_paths: Vec>, } diff --git a/crates/domain/src/testing.rs b/crates/domain/src/testing.rs index f6783b4..f695bdf 100644 --- a/crates/domain/src/testing.rs +++ b/crates/domain/src/testing.rs @@ -672,6 +672,12 @@ impl PersonCommand for PanicPersonCommand { async fn upsert_batch(&self, _persons: &[Person]) -> Result<(), DomainError> { panic!("PanicPersonCommand called") } + async fn backfill_from_credits_batch( + &self, + _batch_size: u32, + ) -> Result<(u64, bool), DomainError> { + panic!("PanicPersonCommand called") + } } // ── PanicPersonQuery ────────────────────────────────────────────────────────── @@ -698,6 +704,10 @@ impl PersonQuery for PanicPersonQuery { async fn list_orphaned_persons(&self) -> Result, DomainError> { panic!("PanicPersonQuery called") } + + async fn list_page(&self, _limit: u32, _offset: u32) -> Result, DomainError> { + panic!("PanicPersonQuery called") + } } // ── PanicSearchPort ─────────────────────────────────────────────────────────── diff --git a/crates/presentation/src/handlers/api.rs b/crates/presentation/src/handlers/api.rs index 3856d7a..2a5a1ff 100644 --- a/crates/presentation/src/handlers/api.rs +++ b/crates/presentation/src/handlers/api.rs @@ -332,61 +332,67 @@ pub async fn get_movie_profile( State(state): State, Path(movie_id): Path, ) -> impl IntoResponse { - let id = domain::value_objects::MovieId::from_uuid(movie_id); - match state.app_ctx.repos.movie_profile.get_by_movie_id(&id).await { - Ok(Some(p)) => Json(MovieProfileResponse { - tmdb_id: p.tmdb_id, - imdb_id: p.imdb_id, - overview: p.overview, - tagline: p.tagline, - runtime_minutes: p.runtime_minutes, - budget_usd: p.budget_usd, - revenue_usd: p.revenue_usd, - vote_average: p.vote_average, - vote_count: p.vote_count, - original_language: p.original_language, - collection_name: p.collection_name, - genres: p - .genres - .into_iter() - .map(|g| GenreDto { - tmdb_id: g.tmdb_id, - name: g.name, - }) - .collect(), - keywords: p - .keywords - .into_iter() - .map(|k| KeywordDto { - tmdb_id: k.tmdb_id, - name: k.name, - }) - .collect(), - cast: p - .cast - .into_iter() - .map(|c| CastMemberDto { - tmdb_person_id: c.tmdb_person_id, - name: c.name, - character: c.character, - billing_order: c.billing_order, - profile_path: c.profile_path, - }) - .collect(), - crew: p - .crew - .into_iter() - .map(|c| CrewMemberDto { - tmdb_person_id: c.tmdb_person_id, - name: c.name, - job: c.job, - department: c.department, - profile_path: c.profile_path, - }) - .collect(), - enriched_at: p.enriched_at.to_rfc3339(), - }) - .into_response(), + use application::movies::get_movie_profile; + let query = get_movie_profile::GetMovieProfileQuery { movie_id }; + match get_movie_profile::execute(&state.app_ctx, query).await { + Ok(Some(result)) => { + let p = result.profile; + Json(MovieProfileResponse { + tmdb_id: p.tmdb_id, + imdb_id: p.imdb_id, + overview: p.overview, + tagline: p.tagline, + runtime_minutes: p.runtime_minutes, + budget_usd: p.budget_usd, + revenue_usd: p.revenue_usd, + vote_average: p.vote_average, + vote_count: p.vote_count, + original_language: p.original_language, + collection_name: p.collection_name, + genres: p + .genres + .into_iter() + .map(|g| GenreDto { + tmdb_id: g.tmdb_id, + name: g.name, + }) + .collect(), + keywords: p + .keywords + .into_iter() + .map(|k| KeywordDto { + tmdb_id: k.tmdb_id, + name: k.name, + }) + .collect(), + cast: result + .cast + .into_iter() + .map(|c| CastMemberDto { + person_id: c.person_id.value().to_string(), + tmdb_person_id: c.tmdb_person_id, + name: c.name, + character: c.character, + billing_order: c.billing_order, + profile_path: c.profile_path, + }) + .collect(), + crew: result + .crew + .into_iter() + .map(|c| CrewMemberDto { + person_id: c.person_id.value().to_string(), + tmdb_person_id: c.tmdb_person_id, + name: c.name, + job: c.job, + department: c.department, + profile_path: c.profile_path, + }) + .collect(), + enriched_at: p.enriched_at.to_rfc3339(), + }) + .into_response() + } Ok(None) => StatusCode::NOT_FOUND.into_response(), Err(e) => crate::errors::domain_error_response(e), } @@ -982,6 +988,20 @@ pub async fn get_pending_followers( } } +pub async fn post_reindex_search( + State(state): State, + _admin: crate::extractors::AdminApiUser, +) -> impl IntoResponse { + let event = domain::events::DomainEvent::SearchReindexRequested; + match state.app_ctx.services.event_publisher.publish(&event).await { + Ok(()) => StatusCode::ACCEPTED, + Err(e) => { + tracing::error!("failed to publish reindex event: {:?}", e); + StatusCode::INTERNAL_SERVER_ERROR + } + } +} + #[utoipa::path( get, path = "/api/v1/activity-feed", params(ActivityFeedQueryParams), @@ -1032,6 +1052,8 @@ pub async fn list_users(State(state): State) -> Result FeedEntryDto { user_id: e.review().user_id().value(), user_email: e.user_email().to_string(), user_display_name: e.user_display_name().to_string(), + is_federated: e.review().is_remote(), } } diff --git a/crates/presentation/src/mappers/users.rs b/crates/presentation/src/mappers/users.rs index 7091450..abe347e 100644 --- a/crates/presentation/src/mappers/users.rs +++ b/crates/presentation/src/mappers/users.rs @@ -4,7 +4,10 @@ use domain::ports::RemoteActorInfo; use template_askama::{RemoteActorData, RemoteActorDisplay, UserSummaryView}; pub fn user_summary_view(u: &UserSummary) -> UserSummaryView { - let name = u.email().split('@').next().unwrap_or("?").to_string(); + let name = u + .display_name() + .map(String::from) + .unwrap_or_else(|| u.username().to_string()); let initial = name.chars().next().unwrap_or('?').to_ascii_uppercase(); let avg_display = u .avg_rating diff --git a/crates/presentation/src/routes.rs b/crates/presentation/src/routes.rs index ca3ed6c..e1f2bd7 100644 --- a/crates/presentation/src/routes.rs +++ b/crates/presentation/src/routes.rs @@ -432,6 +432,10 @@ fn api_routes(rate_limit: u64) -> Router { .route( "/wrapups/{id}/video", routing::get(handlers::wrapup::get_video), + ) + .route( + "/admin/reindex-search", + routing::post(handlers::api::post_reindex_search), ); #[cfg(feature = "federation")] diff --git a/crates/presentation/src/tests/api_handlers.rs b/crates/presentation/src/tests/api_handlers.rs index 72d9bc5..9209f0d 100644 --- a/crates/presentation/src/tests/api_handlers.rs +++ b/crates/presentation/src/tests/api_handlers.rs @@ -60,6 +60,13 @@ impl domain::ports::PersonQuery for PersonQueryStub { async fn list_orphaned_persons(&self) -> Result, DomainError> { Ok(vec![]) } + async fn list_page( + &self, + _limit: u32, + _offset: u32, + ) -> Result, DomainError> { + Ok(vec![]) + } } // --- Search endpoint tests --- diff --git a/crates/presentation/src/tests/extractors.rs b/crates/presentation/src/tests/extractors.rs index df525ef..b4de426 100644 --- a/crates/presentation/src/tests/extractors.rs +++ b/crates/presentation/src/tests/extractors.rs @@ -431,6 +431,12 @@ impl PersonCommand for Panic { async fn upsert_batch(&self, _: &[Person]) -> Result<(), DomainError> { panic!() } + async fn backfill_from_credits_batch( + &self, + _batch_size: u32, + ) -> Result<(u64, bool), DomainError> { + panic!() + } } #[async_trait::async_trait] impl PersonQuery for Panic { @@ -449,6 +455,13 @@ impl PersonQuery for Panic { async fn list_orphaned_persons(&self) -> Result, DomainError> { panic!() } + async fn list_page( + &self, + _limit: u32, + _offset: u32, + ) -> Result, DomainError> { + panic!() + } } #[async_trait::async_trait] impl SearchPort for Panic { diff --git a/crates/presentation/tests/api_test.rs b/crates/presentation/tests/api_test.rs index 3f97fa9..01d327c 100644 --- a/crates/presentation/tests/api_test.rs +++ b/crates/presentation/tests/api_test.rs @@ -297,6 +297,12 @@ impl PersonCommand for PanicPersonCommand { async fn upsert_batch(&self, _: &[Person]) -> Result<(), DomainError> { panic!() } + async fn backfill_from_credits_batch( + &self, + _batch_size: u32, + ) -> Result<(u64, bool), DomainError> { + panic!() + } } struct PanicPersonQuery; @@ -317,6 +323,13 @@ impl PersonQuery for PanicPersonQuery { async fn list_orphaned_persons(&self) -> Result, DomainError> { panic!() } + async fn list_page( + &self, + _limit: u32, + _offset: u32, + ) -> Result, DomainError> { + panic!() + } } struct PanicSearchPort; diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 2c95f17..ce0dde1 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use anyhow::Context; use application::{ - MovieDiscoveryIndexer, SearchCleanupHandler, + MovieDiscoveryIndexer, SearchCleanupHandler, SearchReindexHandler, config::AppConfig, context::{AppContext, Repositories, Services}, worker::WorkerService, @@ -232,12 +232,15 @@ async fn main() -> anyhow::Result<()> { let wrapup_handler = Arc::new( application::wrapup::event_handler::WrapUpEventHandler::new(ctx.clone()), ) as Arc; + let reindex_handler = + Arc::new(SearchReindexHandler::new(ctx.clone())) as Arc; let mut h = vec![ poster, cleanup, search_cleanup, discovery_indexer, wrapup_handler, + reindex_handler, ]; if let Some(e) = enrichment_handler { h.push(e); @@ -282,6 +285,8 @@ async fn main() -> anyhow::Result<()> { let wrapup_handler = Arc::new( application::wrapup::event_handler::WrapUpEventHandler::new(ctx.clone()), ) as Arc; + let reindex_handler = + Arc::new(SearchReindexHandler::new(ctx.clone())) as Arc; let mut h = vec![ poster, cleanup, @@ -290,6 +295,7 @@ async fn main() -> anyhow::Result<()> { search_cleanup, discovery_indexer, wrapup_handler, + reindex_handler, ]; if let Some(e) = enrichment_handler { h.push(e); @@ -303,10 +309,15 @@ async fn main() -> anyhow::Result<()> { // ── Run ─────────────────────────────────────────────────────────────────── + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.ok(); + let _ = shutdown_tx.send(true); + }); + let worker = WorkerService::new(consumer_arc, handlers); tracing::info!("worker started"); - worker.run().await; - tracing::info!("worker stopped"); + worker.run(shutdown_rx).await; Ok(()) }