Compare commits

...

2 Commits

Author SHA1 Message Date
01c1082290 feat: SPA bug fixes, interactivity, federation badges, admin reindex
Some checks failed
CI / Check / Test (push) Failing after 10m55s
- fix wrapup status "completed" → "Ready"
- fix unfollow sending {handle} instead of {actor_url}
- fix missing post import in users.ts
- fix feed/activity cache not invalidated on review delete/log
- add person_id to cast/crew types, link to /people pages
- add movie_id to wrapup MovieRef, link highlights to /movies pages
- add wrapup actor profile images + clickable person links
- add federated review globe badge in feed and movie detail
- add fediverse handle (@user@instance) in follower/following cards
- add admin reindex search button in settings
- add wrapup user picker for admins
- add username/display_name to user summary type
- use tmdbProfileUrl for person search results
2026-06-04 14:43:41 +02:00
bd7dc648c4 feat: search reindex, worker improvements, person IDs, user display names
- add admin POST /api/v1/admin/reindex-search endpoint + event-driven handler
- backfill persons from movie_cast/movie_crew into persons table
- paginate person list_page/backfill_from_credits_batch to cap memory
- concurrent worker event dispatch with semaphore (max 8)
- graceful worker shutdown (drain in-flight tasks on SIGINT)
- always ack events, log handler errors as warnings (no infinite retry)
- NATS ack_wait 600s, AtomicBool guard against concurrent reindex
- add username/display_name to UserSummaryDto and users list
- add person_id to CastMemberDto/CrewMemberDto via get_movie_profile use case
- add movie_id to wrapup MovieRef, person_id to wrapup PersonStat
- thread tmdb_person_id through wrapup cast pipeline
- add is_federated to FeedEntryDto
- cap orphaned persons query with LIMIT 500
- add SPA link to classic site footer
2026-06-04 14:43:28 +02:00
54 changed files with 852 additions and 148 deletions

View File

@@ -86,6 +86,7 @@ pub enum EventPayload {
WrapUpCompleted {
wrapup_id: String,
},
SearchReindexRequested,
}
impl EventPayload {
@@ -107,6 +108,7 @@ impl EventPayload {
EventPayload::WatchEventIngested { .. } => "WatchEventIngested",
EventPayload::WrapUpRequested { .. } => "WrapUpRequested",
EventPayload::WrapUpCompleted { .. } => "WrapUpCompleted",
EventPayload::SearchReindexRequested => "SearchReindexRequested",
}
}
}
@@ -248,6 +250,7 @@ impl From<&DomainEvent> for EventPayload {
DomainEvent::WrapUpCompleted { wrapup_id } => EventPayload::WrapUpCompleted {
wrapup_id: wrapup_id.value().to_string(),
},
DomainEvent::SearchReindexRequested => EventPayload::SearchReindexRequested,
}
}
}
@@ -398,6 +401,7 @@ impl TryFrom<EventPayload> for DomainEvent {
wrapup_id: WrapUpId::from_uuid(wid),
})
}
EventPayload::SearchReindexRequested => Ok(DomainEvent::SearchReindexRequested),
}
}
}

View File

@@ -120,6 +120,7 @@ impl NatsJetStreamConsumer {
pull::Config {
durable_name: Some(cfg.consumer_name.clone()),
filter_subject: subject_filter,
ack_wait: std::time::Duration::from_secs(600),
..Default::default()
},
)

View File

@@ -18,6 +18,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String {
DomainEvent::WatchEventIngested { .. } => "watch.event.ingested",
DomainEvent::WrapUpRequested { .. } => "wrapup.requested",
DomainEvent::WrapUpCompleted { .. } => "wrapup.completed",
DomainEvent::SearchReindexRequested => "search.reindex.requested",
};
format!("{prefix}.{suffix}")
}

View File

@@ -7,7 +7,7 @@ use domain::{
},
value_objects::{
Comment, Email, ExternalMetadataId, MovieId, MovieTitle, PosterPath, Rating, ReleaseYear,
ReviewId, UserId,
ReviewId, UserId, Username,
},
};
use uuid::Uuid;
@@ -237,6 +237,8 @@ impl MovieStatsRow {
pub(crate) struct UserSummaryRow {
pub id: String,
pub email: String,
pub username: String,
pub display_name: Option<String>,
pub total_movies: i64,
pub avg_rating: Option<f64>,
pub avatar_path: Option<String>,
@@ -247,6 +249,8 @@ impl UserSummaryRow {
Ok(UserSummary::new(
UserId::from_uuid(parse_uuid(&self.id)?),
Email::new(self.email)?,
Username::new(self.username)?,
self.display_name,
self.total_movies,
self.avg_rating,
self.avatar_path,

View File

@@ -57,6 +57,60 @@ impl PersonCommand for PostgresPersonAdapter {
}
Ok(())
}
async fn backfill_from_credits_batch(
&self,
batch_size: u32,
) -> Result<(u64, bool), DomainError> {
#[derive(sqlx::FromRow)]
struct MissingPerson {
tmdb_person_id: i64,
name: String,
department: Option<String>,
profile_path: Option<String>,
}
let rows = sqlx::query_as::<_, MissingPerson>(
"SELECT mc.tmdb_person_id, mc.name, 'Acting' AS department, mc.profile_path
FROM movie_cast mc
WHERE NOT EXISTS (SELECT 1 FROM persons WHERE persons.tmdb_person_id = mc.tmdb_person_id)
GROUP BY mc.tmdb_person_id, mc.name, mc.profile_path
UNION ALL
SELECT mc.tmdb_person_id, mc.name, mc.department, mc.profile_path
FROM movie_crew mc
WHERE NOT EXISTS (SELECT 1 FROM persons WHERE persons.tmdb_person_id = mc.tmdb_person_id)
AND NOT EXISTS (SELECT 1 FROM movie_cast c2 WHERE c2.tmdb_person_id = mc.tmdb_person_id)
GROUP BY mc.tmdb_person_id, mc.name, mc.department, mc.profile_path
LIMIT $1",
)
.bind(batch_size as i64)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
let has_more = rows.len() as u32 >= batch_size;
let mut count = 0u64;
for row in &rows {
let ext = ExternalPersonId::new(format!("tmdb:{}", row.tmdb_person_id));
let pid = PersonId::from_external(&ext);
sqlx::query(
"INSERT INTO persons (id, external_id, tmdb_person_id, name, known_for_department, profile_path)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT(tmdb_person_id) DO NOTHING",
)
.bind(pid.value().to_string())
.bind(ext.value())
.bind(row.tmdb_person_id)
.bind(&row.name)
.bind(&row.department)
.bind(&row.profile_path)
.execute(&self.pool)
.await
.map_err(map_err)?;
count += 1;
}
Ok((count, has_more))
}
}
#[async_trait]
@@ -206,6 +260,40 @@ impl PersonQuery for PostgresPersonAdapter {
Ok(PersonCredits { person, cast, crew })
}
async fn list_page(&self, limit: u32, offset: u32) -> Result<Vec<Person>, DomainError> {
#[derive(sqlx::FromRow)]
struct Row {
id: String,
external_id: String,
name: String,
known_for_department: Option<String>,
profile_path: Option<String>,
}
let rows = sqlx::query_as::<_, Row>(
"SELECT id, external_id, name, known_for_department, profile_path FROM persons ORDER BY id LIMIT $1 OFFSET $2",
)
.bind(limit as i64)
.bind(offset as i64)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
Ok(rows
.into_iter()
.map(|r| {
let ext = ExternalPersonId::new(r.external_id);
Person::new(
PersonId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()),
ext,
r.name,
r.known_for_department,
r.profile_path,
)
})
.collect())
}
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> {
let rows: Vec<(String,)> = sqlx::query_as(
"SELECT id FROM persons
@@ -214,7 +302,8 @@ impl PersonQuery for PostgresPersonAdapter {
)
AND NOT EXISTS (
SELECT 1 FROM movie_crew WHERE movie_crew.tmdb_person_id = persons.tmdb_person_id
)",
)
LIMIT 500",
)
.fetch_all(&self.pool)
.await

View File

@@ -186,13 +186,13 @@ impl UserRepository for PostgresUserRepository {
async fn list_with_stats(&self) -> Result<Vec<domain::models::UserSummary>, DomainError> {
sqlx::query_as::<_, UserSummaryRow>(
r#"SELECT u.id, u.email,
r#"SELECT u.id, u.email, u.username, u.display_name,
COUNT(DISTINCT r.movie_id) AS total_movies,
AVG(r.rating::float) AS avg_rating,
u.avatar_path
FROM users u
LEFT JOIN reviews r ON r.user_id = u.id AND r.remote_actor_url IS NULL
GROUP BY u.id, u.email, u.avatar_path
GROUP BY u.id, u.email, u.username, u.display_name, u.avatar_path
ORDER BY u.email ASC"#,
)
.fetch_all(&self.pool)

View File

@@ -333,9 +333,9 @@ impl WrapUpStatsQuery for PostgresWrapUpStatsQuery {
let keywords = keywords_map.get(&movie_id_str).cloned().unwrap_or_default();
let cast = cast_map.get(&movie_id_str).cloned().unwrap_or_default();
let cast_names: Vec<(String, u32)> = cast
let cast_names: Vec<(String, u32, i64)> = cast
.iter()
.map(|c| (c.name.clone(), c.billing_order))
.map(|c| (c.name.clone(), c.billing_order, c.tmdb_person_id))
.collect();
let cast_profile_paths: Vec<Option<String>> =
cast.iter().map(|c| c.profile_path.clone()).collect();
@@ -367,6 +367,7 @@ impl WrapUpStatsQuery for PostgresWrapUpStatsQuery {
struct CastEntry {
name: String,
billing_order: u32,
tmdb_person_id: i64,
profile_path: Option<String>,
}
@@ -417,7 +418,7 @@ async fn fetch_cast_pg(
movie_ids: &[String],
) -> Result<HashMap<String, Vec<CastEntry>>, DomainError> {
let rows = sqlx::query(
"SELECT movie_id, name, billing_order, profile_path \
"SELECT movie_id, name, billing_order, tmdb_person_id, profile_path \
FROM movie_cast \
WHERE movie_id = ANY($1) AND billing_order <= 3 \
ORDER BY billing_order ASC",
@@ -432,10 +433,12 @@ async fn fetch_cast_pg(
let mid: String = row.try_get("movie_id").map_err(map_err)?;
let name: String = row.try_get("name").map_err(map_err)?;
let billing_order: i32 = row.try_get("billing_order").map_err(map_err)?;
let tmdb_person_id: i64 = row.try_get("tmdb_person_id").map_err(map_err)?;
let profile_path: Option<String> = row.try_get("profile_path").map_err(map_err)?;
map.entry(mid).or_default().push(CastEntry {
name,
billing_order: billing_order as u32,
tmdb_person_id,
profile_path,
});
}

View File

@@ -7,7 +7,7 @@ use domain::{
},
value_objects::{
Comment, Email, ExternalMetadataId, MovieId, MovieTitle, PosterPath, Rating, ReleaseYear,
ReviewId, UserId, WatchlistEntryId,
ReviewId, UserId, Username, WatchlistEntryId,
},
};
use uuid::Uuid;
@@ -245,6 +245,8 @@ impl FeedRow {
pub(crate) struct UserSummaryRow {
pub id: String,
pub email: String,
pub username: String,
pub display_name: Option<String>,
pub total_movies: i64,
pub avg_rating: Option<f64>,
pub avatar_path: Option<String>,
@@ -255,6 +257,8 @@ impl UserSummaryRow {
Ok(UserSummary::new(
UserId::from_uuid(parse_uuid(&self.id)?),
Email::new(self.email)?,
Username::new(self.username)?,
self.display_name,
self.total_movies,
self.avg_rating,
self.avatar_path,

View File

@@ -57,6 +57,60 @@ impl PersonCommand for SqlitePersonAdapter {
}
Ok(())
}
async fn backfill_from_credits_batch(
&self,
batch_size: u32,
) -> Result<(u64, bool), DomainError> {
#[derive(sqlx::FromRow)]
struct MissingPerson {
tmdb_person_id: i64,
name: String,
department: Option<String>,
profile_path: Option<String>,
}
let rows = sqlx::query_as::<_, MissingPerson>(
"SELECT mc.tmdb_person_id, mc.name, 'Acting' AS department, mc.profile_path
FROM movie_cast mc
WHERE NOT EXISTS (SELECT 1 FROM persons WHERE persons.tmdb_person_id = mc.tmdb_person_id)
GROUP BY mc.tmdb_person_id
UNION ALL
SELECT mc.tmdb_person_id, mc.name, mc.department, mc.profile_path
FROM movie_crew mc
WHERE NOT EXISTS (SELECT 1 FROM persons WHERE persons.tmdb_person_id = mc.tmdb_person_id)
AND NOT EXISTS (SELECT 1 FROM movie_cast c2 WHERE c2.tmdb_person_id = mc.tmdb_person_id)
GROUP BY mc.tmdb_person_id
LIMIT ?",
)
.bind(batch_size)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
let has_more = rows.len() as u32 >= batch_size;
let mut count = 0u64;
for row in &rows {
let ext = ExternalPersonId::new(format!("tmdb:{}", row.tmdb_person_id));
let pid = PersonId::from_external(&ext);
sqlx::query(
"INSERT INTO persons (id, external_id, tmdb_person_id, name, known_for_department, profile_path)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(tmdb_person_id) DO NOTHING",
)
.bind(pid.value().to_string())
.bind(ext.value())
.bind(row.tmdb_person_id)
.bind(&row.name)
.bind(&row.department)
.bind(&row.profile_path)
.execute(&self.pool)
.await
.map_err(map_err)?;
count += 1;
}
Ok((count, has_more))
}
}
#[async_trait]
@@ -156,6 +210,19 @@ impl PersonQuery for SqlitePersonAdapter {
Ok(PersonCredits { person, cast, crew })
}
async fn list_page(&self, limit: u32, offset: u32) -> Result<Vec<Person>, DomainError> {
let rows = sqlx::query_as::<_, PersonRow>(
"SELECT id, external_id, name, known_for_department, profile_path FROM persons ORDER BY id LIMIT ? OFFSET ?",
)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
Ok(rows.into_iter().map(PersonRow::into_person).collect())
}
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> {
let rows: Vec<(String,)> = sqlx::query_as(
"SELECT id FROM persons
@@ -164,7 +231,8 @@ impl PersonQuery for SqlitePersonAdapter {
)
AND NOT EXISTS (
SELECT 1 FROM movie_crew WHERE movie_crew.tmdb_person_id = persons.tmdb_person_id
)",
)
LIMIT 500",
)
.fetch_all(&self.pool)
.await

View File

@@ -182,17 +182,15 @@ impl UserRepository for SqliteUserRepository {
}
async fn list_with_stats(&self) -> Result<Vec<domain::models::UserSummary>, DomainError> {
sqlx::query_as!(
UserSummaryRow,
r#"SELECT u.id AS "id!: String",
u.email AS "email!: String",
COUNT(DISTINCT r.movie_id) AS "total_movies!: i64",
sqlx::query_as::<_, UserSummaryRow>(
r#"SELECT u.id, u.email, u.username, u.display_name,
COUNT(DISTINCT r.movie_id) AS total_movies,
AVG(CAST(r.rating AS REAL)) AS avg_rating,
u.avatar_path
FROM users u
LEFT JOIN reviews r ON r.user_id = u.id AND r.remote_actor_url IS NULL
GROUP BY u.id, u.email, u.avatar_path
ORDER BY u.email ASC"#
GROUP BY u.id, u.email, u.username, u.display_name, u.avatar_path
ORDER BY u.email ASC"#,
)
.fetch_all(&self.pool)
.await

View File

@@ -345,9 +345,9 @@ impl WrapUpStatsQuery for SqliteWrapUpStatsQuery {
let keywords = keywords_map.get(&movie_id_str).cloned().unwrap_or_default();
let cast = cast_map.get(&movie_id_str).cloned().unwrap_or_default();
let cast_names: Vec<(String, u32)> = cast
let cast_names: Vec<(String, u32, i64)> = cast
.iter()
.map(|c| (c.name.clone(), c.billing_order))
.map(|c| (c.name.clone(), c.billing_order, c.tmdb_person_id))
.collect();
let cast_profile_paths: Vec<Option<String>> =
cast.iter().map(|c| c.profile_path.clone()).collect();
@@ -379,6 +379,7 @@ impl WrapUpStatsQuery for SqliteWrapUpStatsQuery {
struct CastEntry {
name: String,
billing_order: u32,
tmdb_person_id: i64,
profile_path: Option<String>,
}
@@ -453,7 +454,7 @@ async fn fetch_cast_sqlite(
return Ok(HashMap::new());
}
let sql = format!(
"SELECT movie_id, name, billing_order, profile_path \
"SELECT movie_id, name, billing_order, tmdb_person_id, profile_path \
FROM movie_cast \
WHERE movie_id IN ({}) AND billing_order <= 3 \
ORDER BY billing_order ASC",
@@ -470,10 +471,12 @@ async fn fetch_cast_sqlite(
let mid: String = row.try_get("movie_id").map_err(map_err)?;
let name: String = row.try_get("name").map_err(map_err)?;
let billing_order: i32 = row.try_get("billing_order").map_err(map_err)?;
let tmdb_person_id: i64 = row.try_get("tmdb_person_id").map_err(map_err)?;
let profile_path: Option<String> = row.try_get("profile_path").map_err(map_err)?;
map.entry(mid).or_default().push(CastEntry {
name,
billing_order: billing_order as u32,
tmdb_person_id,
profile_path,
});
}

View File

@@ -63,6 +63,8 @@
{% endif %}
<span class="footer-sep">·</span>
<a href="/docs" target="_blank" class="footer-link">API Docs</a>
<span class="footer-sep">·</span>
<a href="/app/" class="footer-link">Mobile App</a>
</footer>
</body>
</html>

View File

@@ -57,6 +57,7 @@ pub struct FeedEntryDto {
pub user_id: Uuid,
pub user_email: String,
pub user_display_name: String,
pub is_federated: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]

View File

@@ -40,6 +40,7 @@ pub struct KeywordDto {
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct CastMemberDto {
pub person_id: String,
pub tmdb_person_id: u64,
pub name: String,
pub character: String,
@@ -49,6 +50,7 @@ pub struct CastMemberDto {
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct CrewMemberDto {
pub person_id: String,
pub tmdb_person_id: u64,
pub name: String,
pub job: String,

View File

@@ -7,6 +7,8 @@ use crate::diary::{DiaryEntryDto, DiaryResponse};
pub struct UserSummaryDto {
pub id: Uuid,
pub email: String,
pub username: String,
pub display_name: Option<String>,
pub total_movies: i64,
pub avg_rating: Option<f64>,
}

View File

@@ -20,3 +20,4 @@ pub mod test_helpers;
pub use movies::MovieDiscoveryIndexer;
pub use movies::SearchCleanupHandler;
pub use movies::SearchReindexHandler;

View File

@@ -0,0 +1,78 @@
use domain::{
errors::DomainError,
models::{CastMember, CrewMember, ExternalPersonId, MovieProfile, PersonId},
value_objects::MovieId,
};
use uuid::Uuid;
use crate::context::AppContext;
pub struct GetMovieProfileQuery {
pub movie_id: Uuid,
}
pub struct CastMemberWithId {
pub person_id: PersonId,
pub tmdb_person_id: u64,
pub name: String,
pub character: String,
pub billing_order: u32,
pub profile_path: Option<String>,
}
pub struct CrewMemberWithId {
pub person_id: PersonId,
pub tmdb_person_id: u64,
pub name: String,
pub job: String,
pub department: String,
pub profile_path: Option<String>,
}
pub struct MovieProfileResult {
pub profile: MovieProfile,
pub cast: Vec<CastMemberWithId>,
pub crew: Vec<CrewMemberWithId>,
}
fn resolve_cast(member: &CastMember) -> CastMemberWithId {
let ext = ExternalPersonId::new(format!("tmdb:{}", member.tmdb_person_id));
CastMemberWithId {
person_id: PersonId::from_external(&ext),
tmdb_person_id: member.tmdb_person_id,
name: member.name.clone(),
character: member.character.clone(),
billing_order: member.billing_order,
profile_path: member.profile_path.clone(),
}
}
fn resolve_crew(member: &CrewMember) -> CrewMemberWithId {
let ext = ExternalPersonId::new(format!("tmdb:{}", member.tmdb_person_id));
CrewMemberWithId {
person_id: PersonId::from_external(&ext),
tmdb_person_id: member.tmdb_person_id,
name: member.name.clone(),
job: member.job.clone(),
department: member.department.clone(),
profile_path: member.profile_path.clone(),
}
}
pub async fn execute(
ctx: &AppContext,
query: GetMovieProfileQuery,
) -> Result<Option<MovieProfileResult>, DomainError> {
let movie_id = MovieId::from_uuid(query.movie_id);
let profile = ctx.repos.movie_profile.get_by_movie_id(&movie_id).await?;
Ok(profile.map(|p| {
let cast = p.cast.iter().map(resolve_cast).collect();
let crew = p.crew.iter().map(resolve_crew).collect();
MovieProfileResult {
profile: p,
cast,
crew,
}
}))
}

View File

@@ -1,10 +1,13 @@
pub mod commands;
pub mod discovery_indexer;
pub mod enrich_movie;
pub mod get_movie_profile;
pub mod get_movies;
pub mod queries;
pub mod reindex_search;
pub mod search_cleanup;
pub mod sync_poster;
pub use discovery_indexer::MovieDiscoveryIndexer;
pub use reindex_search::SearchReindexHandler;
pub use search_cleanup::SearchCleanupHandler;

View File

@@ -0,0 +1,165 @@
use async_trait::async_trait;
use domain::{
errors::DomainError,
events::DomainEvent,
models::{IndexableDocument, MovieFilter, collections::PageParams},
ports::EventHandler,
};
use std::sync::atomic::{AtomicBool, Ordering};
use crate::context::AppContext;
const BATCH_SIZE: u32 = 500;
pub struct SearchReindexHandler {
ctx: AppContext,
running: AtomicBool,
}
impl SearchReindexHandler {
pub fn new(ctx: AppContext) -> Self {
Self {
ctx,
running: AtomicBool::new(false),
}
}
}
#[async_trait]
impl EventHandler for SearchReindexHandler {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
if !matches!(event, DomainEvent::SearchReindexRequested) {
return Ok(());
}
if self.running.swap(true, Ordering::SeqCst) {
tracing::info!("search reindex already running, skipping");
return Ok(());
}
let result = self.run_reindex().await;
self.running.store(false, Ordering::SeqCst);
result
}
}
impl SearchReindexHandler {
async fn run_reindex(&self) -> Result<(), DomainError> {
tracing::info!("search reindex started");
let movies_indexed = self.reindex_movies().await?;
let backfilled = self.backfill_persons().await?;
if backfilled > 0 {
tracing::info!(backfilled, "backfilled missing persons from credits");
}
let persons_indexed = self.reindex_persons().await?;
tracing::info!(movies_indexed, persons_indexed, "search reindex completed");
Ok(())
}
async fn reindex_movies(&self) -> Result<u64, DomainError> {
let mut count: u64 = 0;
let mut offset: u32 = 0;
loop {
let page = self
.ctx
.repos
.movie
.list_movies(
&PageParams {
limit: BATCH_SIZE,
offset,
},
&MovieFilter::default(),
)
.await?;
for summary in &page.items {
let movie_id = summary.movie.id().clone();
let profile = self
.ctx
.repos
.movie_profile
.get_by_movie_id(&movie_id)
.await?;
if let Err(e) = self
.ctx
.repos
.search_command
.index(IndexableDocument::Movie {
id: movie_id.clone(),
movie: Box::new(summary.movie.clone()),
profile: profile.map(Box::new),
})
.await
{
tracing::warn!(movie_id = %movie_id.value(), "reindex movie failed: {e}");
}
count += 1;
}
if (page.items.len() as u32) < BATCH_SIZE {
break;
}
offset += BATCH_SIZE;
tokio::task::yield_now().await;
}
Ok(count)
}
async fn backfill_persons(&self) -> Result<u64, DomainError> {
let mut total = 0u64;
loop {
let (count, has_more) = self
.ctx
.repos
.person_command
.backfill_from_credits_batch(BATCH_SIZE)
.await?;
total += count;
if !has_more {
break;
}
tokio::task::yield_now().await;
}
Ok(total)
}
async fn reindex_persons(&self) -> Result<u64, DomainError> {
let mut count: u64 = 0;
let mut offset: u32 = 0;
loop {
let persons = self
.ctx
.repos
.person_query
.list_page(BATCH_SIZE, offset)
.await?;
for person in &persons {
if let Err(e) = self
.ctx
.repos
.search_command
.index(IndexableDocument::Person {
id: person.id().clone(),
person: Box::new(person.clone()),
})
.await
{
tracing::warn!(person = %person.name(), "reindex person failed: {e}");
}
count += 1;
}
if (persons.len() as u32) < BATCH_SIZE {
break;
}
offset += BATCH_SIZE;
tokio::task::yield_now().await;
}
Ok(count)
}
}

View File

@@ -61,6 +61,7 @@ impl EventHandler for RecordingHandler {
DomainEvent::WatchEventIngested { .. } => "watch_event_ingested",
DomainEvent::WrapUpRequested { .. } => "wrapup_requested",
DomainEvent::WrapUpCompleted { .. } => "wrapup_completed",
DomainEvent::SearchReindexRequested => "search_reindex",
};
self.calls.lock().unwrap().push(label);
Ok(())
@@ -85,34 +86,34 @@ async fn dispatches_to_all_handlers() {
};
WorkerService::new(Arc::new(consumer), vec![Arc::new(handler)])
.run()
.run(tokio::sync::watch::channel(false).1)
.await;
assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]);
}
#[tokio::test]
async fn nacks_when_handler_fails() {
let nack_called = Arc::new(Mutex::new(false));
async fn acks_even_when_handler_fails() {
let ack_called = Arc::new(Mutex::new(false));
struct TrackingAck {
nack_called: Arc<Mutex<bool>>,
ack_called: Arc<Mutex<bool>>,
}
#[async_trait]
impl AckHandle for TrackingAck {
async fn ack(&self) -> Result<(), DomainError> {
*self.ack_called.lock().unwrap() = true;
Ok(())
}
async fn nack(&self) -> Result<(), DomainError> {
*self.nack_called.lock().unwrap() = true;
Ok(())
}
}
struct TrackingConsumer {
event: DomainEvent,
nack_called: Arc<Mutex<bool>>,
ack_called: Arc<Mutex<bool>>,
}
impl EventConsumer for TrackingConsumer {
@@ -120,7 +121,7 @@ async fn nacks_when_handler_fails() {
let envelope = EventEnvelope::new(
self.event.clone(),
Box::new(TrackingAck {
nack_called: Arc::clone(&self.nack_called),
ack_called: Arc::clone(&self.ack_called),
}),
);
Box::pin(stream::iter(vec![Ok(envelope)]))
@@ -138,14 +139,14 @@ async fn nacks_when_handler_fails() {
let consumer = TrackingConsumer {
event: movie_discovered(),
nack_called: Arc::clone(&nack_called),
ack_called: Arc::clone(&ack_called),
};
WorkerService::new(Arc::new(consumer), vec![Arc::new(FailingHandler)])
.run()
.run(tokio::sync::watch::channel(false).1)
.await;
assert!(*nack_called.lock().unwrap());
assert!(*ack_called.lock().unwrap());
}
#[tokio::test]
@@ -189,7 +190,9 @@ async fn acks_when_all_handlers_succeed() {
ack_called: Arc::clone(&ack_called),
};
WorkerService::new(Arc::new(consumer), vec![]).run().await;
WorkerService::new(Arc::new(consumer), vec![])
.run(tokio::sync::watch::channel(false).1)
.await;
assert!(*ack_called.lock().unwrap());
}

View File

@@ -5,47 +5,73 @@ use domain::{
ports::{EventConsumer, EventHandler},
};
use futures::StreamExt;
use tokio::sync::Semaphore;
const DEFAULT_CONCURRENCY: usize = 8;
pub struct WorkerService {
consumer: Arc<dyn EventConsumer>,
handlers: Vec<Arc<dyn EventHandler>>,
semaphore: Arc<Semaphore>,
}
impl WorkerService {
pub fn new(consumer: Arc<dyn EventConsumer>, handlers: Vec<Arc<dyn EventHandler>>) -> Self {
Self { consumer, handlers }
Self {
consumer,
handlers,
semaphore: Arc::new(Semaphore::new(DEFAULT_CONCURRENCY)),
}
}
pub async fn run(self) {
pub async fn run(self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
let handlers = Arc::new(self.handlers);
let mut tasks = tokio::task::JoinSet::new();
let mut stream = self.consumer.consume();
while let Some(result) = stream.next().await {
match result {
Ok(envelope) => {
tracing::info!(event = ?envelope.event, "received event");
self.dispatch(envelope).await;
}
Err(e) => tracing::error!("event consumer error: {e}"),
}
}
tracing::info!("event stream ended, worker shutting down");
}
async fn dispatch(&self, envelope: EventEnvelope) {
let mut all_ok = true;
for handler in &self.handlers {
if let Err(e) = handler.handle(&envelope.event).await {
tracing::error!("event handler error: {e}");
all_ok = false;
loop {
tokio::select! {
biased;
_ = shutdown.changed() => {
tracing::info!("shutdown signal received, stopping event consumption");
break;
}
item = stream.next() => {
match item {
Some(Ok(envelope)) => {
tracing::info!(event = ?envelope.event, "received event");
let permit = self.semaphore.clone().acquire_owned().await;
let Ok(permit) = permit else { break };
let h = Arc::clone(&handlers);
tasks.spawn(async move {
dispatch(h, envelope).await;
drop(permit);
});
}
Some(Err(e)) => tracing::error!("event consumer error: {e}"),
None => break,
}
}
}
}
let result = if all_ok {
envelope.ack().await
} else {
envelope.nack().await
};
if let Err(e) = result {
tracing::error!("ack/nack failed: {e}");
let in_flight = tasks.len();
if in_flight > 0 {
tracing::info!(in_flight, "draining in-flight tasks before shutdown");
}
while tasks.join_next().await.is_some() {}
tracing::info!("worker shut down gracefully");
}
}
async fn dispatch(handlers: Arc<Vec<Arc<dyn EventHandler>>>, envelope: EventEnvelope) {
for handler in handlers.iter() {
if let Err(e) = handler.handle(&envelope.event).await {
tracing::warn!("event handler error (non-fatal): {e}");
}
}
if let Err(e) = envelope.ack().await {
tracing::error!("ack failed: {e}");
}
}

View File

@@ -125,6 +125,7 @@ fn build_report(
fn movie_ref(r: &WrapUpMovieRow) -> MovieRef {
MovieRef {
movie_id: Some(r.movie_id),
title: r.title.clone(),
year: r.release_year,
runtime_minutes: r.runtime_minutes,
@@ -233,6 +234,7 @@ fn compute_director_stats(rows: &[WrapUpMovieRow]) -> (Vec<PersonStat>, u32) {
let count = ratings.len() as u32;
let avg = ratings.iter().map(|&r| r as f64).sum::<f64>() / ratings.len() as f64;
PersonStat {
person_id: None,
name,
count,
avg_rating: avg,
@@ -249,12 +251,16 @@ fn compute_director_stats(rows: &[WrapUpMovieRow]) -> (Vec<PersonStat>, u32) {
}
fn compute_actor_stats(rows: &[WrapUpMovieRow]) -> (Vec<PersonStat>, u32, Vec<String>) {
use domain::models::{ExternalPersonId, PersonId};
let mut actor_movies: HashMap<String, Vec<u8>> = HashMap::new();
let mut actor_profiles: HashMap<String, Option<String>> = HashMap::new();
let mut actor_tmdb_ids: HashMap<String, i64> = HashMap::new();
for r in rows {
for (i, (name, billing)) in r.cast_names.iter().enumerate() {
for (i, (name, billing, tmdb_id)) in r.cast_names.iter().enumerate() {
if *billing <= 3 {
actor_movies.entry(name.clone()).or_default().push(r.rating);
actor_tmdb_ids.entry(name.clone()).or_insert(*tmdb_id);
if let Some(path) = r.cast_profile_paths.get(i) {
actor_profiles
.entry(name.clone())
@@ -269,7 +275,12 @@ fn compute_actor_stats(rows: &[WrapUpMovieRow]) -> (Vec<PersonStat>, u32, Vec<St
.map(|(name, ratings)| {
let count = ratings.len() as u32;
let avg = ratings.iter().map(|&r| r as f64).sum::<f64>() / ratings.len() as f64;
let person_id = actor_tmdb_ids.get(&name).map(|tid| {
let ext = ExternalPersonId::new(format!("tmdb:{tid}"));
PersonId::from_external(&ext).value()
});
PersonStat {
person_id,
name,
count,
avg_rating: avg,

View File

@@ -26,7 +26,7 @@ fn make_row(title: &str, rating: u8, watched_at: &str) -> WrapUpMovieRow {
original_language: Some("en".to_string()),
genres: vec!["Action".to_string()],
keywords: vec!["heist".to_string()],
cast_names: vec![("Actor A".to_string(), 1)],
cast_names: vec![("Actor A".to_string(), 1, 12345)],
cast_profile_paths: vec![None],
}
}

View File

@@ -84,6 +84,7 @@ pub enum DomainEvent {
WrapUpCompleted {
wrapup_id: WrapUpId,
},
SearchReindexRequested,
}
#[async_trait]

View File

@@ -461,6 +461,8 @@ impl FeedEntry {
pub struct UserSummary {
pub user_id: UserId,
email: Email,
username: Username,
display_name: Option<String>,
pub total_movies: i64,
pub avg_rating: Option<f64>,
pub avatar_path: Option<String>,
@@ -470,6 +472,8 @@ impl UserSummary {
pub fn new(
user_id: UserId,
email: Email,
username: Username,
display_name: Option<String>,
total_movies: i64,
avg_rating: Option<f64>,
avatar_path: Option<String>,
@@ -477,6 +481,8 @@ impl UserSummary {
Self {
user_id,
email,
username,
display_name,
total_movies,
avg_rating,
avatar_path,
@@ -485,6 +491,12 @@ impl UserSummary {
pub fn email(&self) -> &str {
self.email.value()
}
pub fn username(&self) -> &str {
self.username.value()
}
pub fn display_name(&self) -> Option<&str> {
self.display_name.as_deref()
}
}
#[derive(Clone, Debug)]

View File

@@ -36,6 +36,7 @@ impl DateRange {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MovieRef {
pub movie_id: Option<Uuid>,
pub title: String,
pub year: u16,
pub runtime_minutes: Option<u32>,
@@ -50,6 +51,7 @@ pub struct UserRef {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PersonStat {
pub person_id: Option<Uuid>,
pub name: String,
pub count: u32,
pub avg_rating: f64,

View File

@@ -332,6 +332,12 @@ pub trait ImageRefQuery: Send + Sync {
pub trait PersonCommand: Send + Sync {
/// Upsert a batch of persons. Uses INSERT OR REPLACE (SQLite) / ON CONFLICT DO UPDATE (Postgres).
async fn upsert_batch(&self, persons: &[Person]) -> Result<(), DomainError>;
/// Insert a batch of missing persons from movie_cast/movie_crew into the persons table.
/// Returns (inserted_count, has_more).
async fn backfill_from_credits_batch(
&self,
batch_size: u32,
) -> Result<(u64, bool), DomainError>;
}
/// Read port — queries persons and credits. No mutations.
@@ -347,6 +353,7 @@ pub trait PersonQuery: Send + Sync {
/// Returns persons who have no remaining entries in movie_cast or movie_crew.
/// Called after movie deletion to find index entries that can be pruned.
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError>;
async fn list_page(&self, limit: u32, offset: u32) -> Result<Vec<Person>, DomainError>;
}
/// Read port — executes search queries. No mutations.
@@ -519,7 +526,7 @@ pub struct WrapUpMovieRow {
pub original_language: Option<String>,
pub genres: Vec<String>,
pub keywords: Vec<String>,
pub cast_names: Vec<(String, u32)>,
pub cast_names: Vec<(String, u32, i64)>,
pub cast_profile_paths: Vec<Option<String>>,
}

View File

@@ -672,6 +672,12 @@ impl PersonCommand for PanicPersonCommand {
async fn upsert_batch(&self, _persons: &[Person]) -> Result<(), DomainError> {
panic!("PanicPersonCommand called")
}
async fn backfill_from_credits_batch(
&self,
_batch_size: u32,
) -> Result<(u64, bool), DomainError> {
panic!("PanicPersonCommand called")
}
}
// ── PanicPersonQuery ──────────────────────────────────────────────────────────
@@ -698,6 +704,10 @@ impl PersonQuery for PanicPersonQuery {
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> {
panic!("PanicPersonQuery called")
}
async fn list_page(&self, _limit: u32, _offset: u32) -> Result<Vec<Person>, DomainError> {
panic!("PanicPersonQuery called")
}
}
// ── PanicSearchPort ───────────────────────────────────────────────────────────

View File

@@ -332,61 +332,67 @@ pub async fn get_movie_profile(
State(state): State<AppState>,
Path(movie_id): Path<Uuid>,
) -> impl IntoResponse {
let id = domain::value_objects::MovieId::from_uuid(movie_id);
match state.app_ctx.repos.movie_profile.get_by_movie_id(&id).await {
Ok(Some(p)) => Json(MovieProfileResponse {
tmdb_id: p.tmdb_id,
imdb_id: p.imdb_id,
overview: p.overview,
tagline: p.tagline,
runtime_minutes: p.runtime_minutes,
budget_usd: p.budget_usd,
revenue_usd: p.revenue_usd,
vote_average: p.vote_average,
vote_count: p.vote_count,
original_language: p.original_language,
collection_name: p.collection_name,
genres: p
.genres
.into_iter()
.map(|g| GenreDto {
tmdb_id: g.tmdb_id,
name: g.name,
})
.collect(),
keywords: p
.keywords
.into_iter()
.map(|k| KeywordDto {
tmdb_id: k.tmdb_id,
name: k.name,
})
.collect(),
cast: p
.cast
.into_iter()
.map(|c| CastMemberDto {
tmdb_person_id: c.tmdb_person_id,
name: c.name,
character: c.character,
billing_order: c.billing_order,
profile_path: c.profile_path,
})
.collect(),
crew: p
.crew
.into_iter()
.map(|c| CrewMemberDto {
tmdb_person_id: c.tmdb_person_id,
name: c.name,
job: c.job,
department: c.department,
profile_path: c.profile_path,
})
.collect(),
enriched_at: p.enriched_at.to_rfc3339(),
})
.into_response(),
use application::movies::get_movie_profile;
let query = get_movie_profile::GetMovieProfileQuery { movie_id };
match get_movie_profile::execute(&state.app_ctx, query).await {
Ok(Some(result)) => {
let p = result.profile;
Json(MovieProfileResponse {
tmdb_id: p.tmdb_id,
imdb_id: p.imdb_id,
overview: p.overview,
tagline: p.tagline,
runtime_minutes: p.runtime_minutes,
budget_usd: p.budget_usd,
revenue_usd: p.revenue_usd,
vote_average: p.vote_average,
vote_count: p.vote_count,
original_language: p.original_language,
collection_name: p.collection_name,
genres: p
.genres
.into_iter()
.map(|g| GenreDto {
tmdb_id: g.tmdb_id,
name: g.name,
})
.collect(),
keywords: p
.keywords
.into_iter()
.map(|k| KeywordDto {
tmdb_id: k.tmdb_id,
name: k.name,
})
.collect(),
cast: result
.cast
.into_iter()
.map(|c| CastMemberDto {
person_id: c.person_id.value().to_string(),
tmdb_person_id: c.tmdb_person_id,
name: c.name,
character: c.character,
billing_order: c.billing_order,
profile_path: c.profile_path,
})
.collect(),
crew: result
.crew
.into_iter()
.map(|c| CrewMemberDto {
person_id: c.person_id.value().to_string(),
tmdb_person_id: c.tmdb_person_id,
name: c.name,
job: c.job,
department: c.department,
profile_path: c.profile_path,
})
.collect(),
enriched_at: p.enriched_at.to_rfc3339(),
})
.into_response()
}
Ok(None) => StatusCode::NOT_FOUND.into_response(),
Err(e) => crate::errors::domain_error_response(e),
}
@@ -982,6 +988,20 @@ pub async fn get_pending_followers(
}
}
pub async fn post_reindex_search(
State(state): State<AppState>,
_admin: crate::extractors::AdminApiUser,
) -> impl IntoResponse {
let event = domain::events::DomainEvent::SearchReindexRequested;
match state.app_ctx.services.event_publisher.publish(&event).await {
Ok(()) => StatusCode::ACCEPTED,
Err(e) => {
tracing::error!("failed to publish reindex event: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
}
}
}
#[utoipa::path(
get, path = "/api/v1/activity-feed",
params(ActivityFeedQueryParams),
@@ -1032,6 +1052,8 @@ pub async fn list_users(State(state): State<AppState>) -> Result<Json<UsersRespo
.map(|u| UserSummaryDto {
id: u.user_id.value(),
email: u.email().to_string(),
username: u.username().to_string(),
display_name: u.display_name().map(String::from),
total_movies: u.total_movies,
avg_rating: u.avg_rating,
})

View File

@@ -10,5 +10,6 @@ pub fn feed_entry_to_dto(e: &FeedEntry) -> FeedEntryDto {
user_id: e.review().user_id().value(),
user_email: e.user_email().to_string(),
user_display_name: e.user_display_name().to_string(),
is_federated: e.review().is_remote(),
}
}

View File

@@ -4,7 +4,10 @@ use domain::ports::RemoteActorInfo;
use template_askama::{RemoteActorData, RemoteActorDisplay, UserSummaryView};
pub fn user_summary_view(u: &UserSummary) -> UserSummaryView {
let name = u.email().split('@').next().unwrap_or("?").to_string();
let name = u
.display_name()
.map(String::from)
.unwrap_or_else(|| u.username().to_string());
let initial = name.chars().next().unwrap_or('?').to_ascii_uppercase();
let avg_display = u
.avg_rating

View File

@@ -432,6 +432,10 @@ fn api_routes(rate_limit: u64) -> Router<AppState> {
.route(
"/wrapups/{id}/video",
routing::get(handlers::wrapup::get_video),
)
.route(
"/admin/reindex-search",
routing::post(handlers::api::post_reindex_search),
);
#[cfg(feature = "federation")]

View File

@@ -60,6 +60,13 @@ impl domain::ports::PersonQuery for PersonQueryStub {
async fn list_orphaned_persons(&self) -> Result<Vec<domain::models::PersonId>, DomainError> {
Ok(vec![])
}
async fn list_page(
&self,
_limit: u32,
_offset: u32,
) -> Result<Vec<domain::models::Person>, DomainError> {
Ok(vec![])
}
}
// --- Search endpoint tests ---

View File

@@ -431,6 +431,12 @@ impl PersonCommand for Panic {
async fn upsert_batch(&self, _: &[Person]) -> Result<(), DomainError> {
panic!()
}
async fn backfill_from_credits_batch(
&self,
_batch_size: u32,
) -> Result<(u64, bool), DomainError> {
panic!()
}
}
#[async_trait::async_trait]
impl PersonQuery for Panic {
@@ -449,6 +455,13 @@ impl PersonQuery for Panic {
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> {
panic!()
}
async fn list_page(
&self,
_limit: u32,
_offset: u32,
) -> Result<Vec<domain::models::Person>, DomainError> {
panic!()
}
}
#[async_trait::async_trait]
impl SearchPort for Panic {

View File

@@ -297,6 +297,12 @@ impl PersonCommand for PanicPersonCommand {
async fn upsert_batch(&self, _: &[Person]) -> Result<(), DomainError> {
panic!()
}
async fn backfill_from_credits_batch(
&self,
_batch_size: u32,
) -> Result<(u64, bool), DomainError> {
panic!()
}
}
struct PanicPersonQuery;
@@ -317,6 +323,13 @@ impl PersonQuery for PanicPersonQuery {
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> {
panic!()
}
async fn list_page(
&self,
_limit: u32,
_offset: u32,
) -> Result<Vec<domain::models::Person>, DomainError> {
panic!()
}
}
struct PanicSearchPort;

View File

@@ -6,7 +6,7 @@ use std::sync::Arc;
use anyhow::Context;
use application::{
MovieDiscoveryIndexer, SearchCleanupHandler,
MovieDiscoveryIndexer, SearchCleanupHandler, SearchReindexHandler,
config::AppConfig,
context::{AppContext, Repositories, Services},
worker::WorkerService,
@@ -232,12 +232,15 @@ async fn main() -> anyhow::Result<()> {
let wrapup_handler = Arc::new(
application::wrapup::event_handler::WrapUpEventHandler::new(ctx.clone()),
) as Arc<dyn EventHandler>;
let reindex_handler =
Arc::new(SearchReindexHandler::new(ctx.clone())) as Arc<dyn EventHandler>;
let mut h = vec![
poster,
cleanup,
search_cleanup,
discovery_indexer,
wrapup_handler,
reindex_handler,
];
if let Some(e) = enrichment_handler {
h.push(e);
@@ -282,6 +285,8 @@ async fn main() -> anyhow::Result<()> {
let wrapup_handler = Arc::new(
application::wrapup::event_handler::WrapUpEventHandler::new(ctx.clone()),
) as Arc<dyn EventHandler>;
let reindex_handler =
Arc::new(SearchReindexHandler::new(ctx.clone())) as Arc<dyn EventHandler>;
let mut h = vec![
poster,
cleanup,
@@ -290,6 +295,7 @@ async fn main() -> anyhow::Result<()> {
search_cleanup,
discovery_indexer,
wrapup_handler,
reindex_handler,
];
if let Some(e) = enrichment_handler {
h.push(e);
@@ -303,10 +309,15 @@ async fn main() -> anyhow::Result<()> {
// ── Run ───────────────────────────────────────────────────────────────────
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
let _ = shutdown_tx.send(true);
});
let worker = WorkerService::new(consumer_arc, handlers);
tracing::info!("worker started");
worker.run().await;
tracing::info!("worker stopped");
worker.run(shutdown_rx).await;
Ok(())
}

View File

@@ -1,7 +1,7 @@
import { Link } from "@tanstack/react-router"
import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar"
import { Card, CardContent } from "@/components/ui/card"
import { posterUrl } from "@/lib/api/client"
import { tmdbProfileUrl } from "@/lib/api/client"
type PersonRowProps = {
id: string
@@ -16,7 +16,7 @@ export function PersonRow({ id, name, subtitle, imagePath }: PersonRowProps) {
<Card size="sm">
<CardContent className="flex items-center gap-3">
<Avatar>
{imagePath && <AvatarImage src={posterUrl(imagePath)} />}
{imagePath && <AvatarImage src={tmdbProfileUrl(imagePath)} />}
<AvatarFallback>{name[0]?.toUpperCase()}</AvatarFallback>
</Avatar>
<div className="min-w-0 flex-1">

View File

@@ -1,4 +1,5 @@
import { Link } from "@tanstack/react-router"
import { Globe } from "lucide-react"
import { StarDisplay } from "@/components/star-display"
import { Card, CardContent } from "@/components/ui/card"
import { posterUrl } from "@/lib/api/client"
@@ -9,9 +10,10 @@ type ReviewCardProps = {
review: ReviewDto
userName?: string
userId?: string
isFederated?: boolean
}
export function ReviewCard({ movie, review, userName, userId }: ReviewCardProps) {
export function ReviewCard({ movie, review, userName, userId, isFederated }: ReviewCardProps) {
return (
<Card size="sm">
<CardContent className="flex gap-3">
@@ -28,6 +30,7 @@ export function ReviewCard({ movie, review, userName, userId }: ReviewCardProps)
) : (
<span>{userName}</span>
)}
{isFederated && <Globe className="size-3 text-muted-foreground/60" />}
<span>·</span>
<span>{review.watched_at.slice(0, 10)}</span>
</div>

View File

@@ -74,6 +74,7 @@ export function useLogReview() {
mutationFn: (data: LogReviewRequest) => logReview(data),
onSuccess: () => {
qc.invalidateQueries({ queryKey: diaryKeys.all })
qc.invalidateQueries({ queryKey: ["activity-feed"] })
},
})
}
@@ -84,6 +85,7 @@ export function useDeleteReview() {
mutationFn: (id: string) => deleteReview(id),
onSuccess: () => {
qc.invalidateQueries({ queryKey: diaryKeys.all })
qc.invalidateQueries({ queryKey: ["activity-feed"] })
},
})
}

View File

@@ -83,7 +83,7 @@ export function useFollow() {
export function useUnfollow() {
const qc = useQueryClient()
return useMutation({
mutationFn: (data: FollowRequest) => unfollow(data),
mutationFn: (data: ActorUrlRequest) => unfollow(data),
onSuccess: () => {
qc.invalidateQueries({ queryKey: socialKeys.following })
},

View File

@@ -31,6 +31,7 @@ export const feedEntryDtoSchema = z.object({
user_id: z.string().uuid(),
user_email: z.string(),
user_display_name: z.string(),
is_federated: z.boolean(),
})
export type FeedEntryDto = z.infer<typeof feedEntryDtoSchema>

View File

@@ -60,6 +60,7 @@ export const keywordDtoSchema = z.object({
})
export const castMemberDtoSchema = z.object({
person_id: z.string(),
tmdb_person_id: z.number(),
name: z.string(),
character: z.string(),
@@ -69,6 +70,7 @@ export const castMemberDtoSchema = z.object({
export type CastMemberDto = z.infer<typeof castMemberDtoSchema>
export const crewMemberDtoSchema = z.object({
person_id: z.string(),
tmdb_person_id: z.number(),
name: z.string(),
job: z.string(),

View File

@@ -68,7 +68,7 @@ export function follow(data: FollowRequest) {
return post("/social/follow", data)
}
export function unfollow(data: FollowRequest) {
export function unfollow(data: ActorUrlRequest) {
return post("/social/unfollow", data)
}

View File

@@ -1,10 +1,12 @@
import { z } from "zod"
import { diaryEntryDtoSchema, paginatedSchema } from "./common"
import { get, put, putForm } from "./client"
import { get, post, put, putForm } from "./client"
export const userSummaryDtoSchema = z.object({
id: z.string().uuid(),
email: z.string(),
username: z.string(),
display_name: z.string().optional(),
total_movies: z.number(),
avg_rating: z.number().optional(),
})
@@ -130,3 +132,7 @@ export function updateProfile(data: UpdateProfileData) {
export function updateProfileFields(data: UpdateProfileFieldsRequest) {
return put("/profile/fields", data)
}
export function reindexSearch() {
return post("/admin/reindex-search")
}

View File

@@ -47,6 +47,7 @@ export function deleteWrapUp(id: string) {
}
export type MovieRef = {
movie_id?: string
title: string
year: number
runtime_minutes?: number
@@ -54,6 +55,7 @@ export type MovieRef = {
}
export type PersonStat = {
person_id?: string
name: string
count: number
avg_rating: number
@@ -91,6 +93,7 @@ export type WrapUpReport = {
first_movie_of_period?: MovieRef
last_movie_of_period?: MovieRef
poster_paths: string[]
top_cast_profile_paths: string[]
}
export function getWrapUpReport(id: string) {

View File

@@ -19,6 +19,7 @@
"continue": "Continue",
"generate": "Generate",
"generating": "Generating...",
"run": "Run",
"reviews": "{{count}} reviews",
"films": "{{count}} films",
"filmsAvg": "{{count}} films, avg {{avg}}"
@@ -154,7 +155,11 @@
"account": "Account",
"data": "Data",
"integrations": "Integrations",
"socialGroup": "Social"
"socialGroup": "Social",
"admin": "Admin",
"rebuildSearch": "Rebuild Search Index",
"rebuildSearchDesc": "Re-index all movies and people",
"rebuildSearchDone": "Reindex queued"
},
"editProfile": {
"title": "Edit Profile",
@@ -201,6 +206,9 @@
"generateWrapUp": "Generate Wrap-Up",
"startDate": "Start Date",
"endDate": "End Date",
"generateFor": "Generate for",
"forSelf": "Myself",
"forGlobal": "Everyone (global)",
"heroSubtitle": "Your Year in Movies",
"moviesWatched": "movies watched",
"watchHours": "{{hours}} hours of watch time",

View File

@@ -100,6 +100,7 @@ function FeedTab() {
review={entry.review}
userName={entry.user_display_name}
userId={entry.user_id}
isFederated={entry.is_federated}
/>
)
return entry.user_id === auth?.user_id ? (

View File

@@ -1,6 +1,6 @@
import { createFileRoute, Link } from "@tanstack/react-router"
import { useTranslation } from "react-i18next"
import { ArrowLeft, Bookmark, BookmarkCheck, Star, TrendingUp, User, Users } from "lucide-react"
import { ArrowLeft, Bookmark, BookmarkCheck, Globe, Star, TrendingUp, User, Users } from "lucide-react"
import { StarDisplay } from "@/components/star-display"
import { RatingHistogram } from "@/components/rating-histogram"
import { EmptyState } from "@/components/empty-state"
@@ -112,7 +112,10 @@ function MovieDetailPage() {
<CardHeader>
<div className="flex items-center justify-between">
<div>
<CardTitle className="text-sm">{r.user_display}</CardTitle>
<CardTitle className="flex items-center gap-1.5 text-sm">
{r.user_display}
{r.is_federated && <Globe className="size-3 text-muted-foreground/60" />}
</CardTitle>
<CardDescription className="text-[10px]">{r.watched_at.slice(0, 10)}</CardDescription>
</div>
<StarDisplay rating={r.rating} size="xs" />
@@ -233,7 +236,7 @@ function PersonStrip({ items, type }: { items: (CastMemberDto | CrewMemberDto)[]
: (person as CrewMemberDto).job
return (
<div key={`${person.tmdb_person_id}-${i}`} className="w-[72px] flex-shrink-0">
<Link key={`${person.tmdb_person_id}-${i}`} to="/people/$id" params={{ id: person.person_id }} className="w-[72px] flex-shrink-0">
<div className="aspect-[2/3] overflow-hidden rounded-lg bg-muted">
{person.profile_path ? (
<img src={tmdbProfileUrl(person.profile_path)} alt="" className="size-full object-cover" loading="lazy" />
@@ -245,7 +248,7 @@ function PersonStrip({ items, type }: { items: (CastMemberDto | CrewMemberDto)[]
</div>
<p className="mt-1 truncate text-[11px] font-semibold leading-tight">{person.name}</p>
<p className="truncate text-[10px] italic text-muted-foreground">{subtitle}</p>
</div>
</Link>
)
})}
</div>

View File

@@ -52,7 +52,7 @@ function ProfilePage() {
function WrapUpLink() {
const { t } = useTranslation()
const { data } = useWrapUps()
const latest = data?.items?.find((w) => w.status === "completed")
const latest = data?.items?.find((w) => w.status === "Ready")
if (!latest) return null

View File

@@ -1,5 +1,6 @@
import { createFileRoute, Link, useNavigate } from "@tanstack/react-router"
import { useTranslation } from "react-i18next"
import { useMutation } from "@tanstack/react-query"
import {
ArrowLeft,
ChevronRight,
@@ -7,11 +8,14 @@ import {
Globe,
Key,
LogOut,
RefreshCw,
ShieldBan,
Sparkles,
User,
} from "lucide-react"
import { Button } from "@/components/ui/button"
import { useAuth, useIsAdmin } from "@/components/auth-provider"
import { reindexSearch } from "@/lib/api/users"
export const Route = createFileRoute("/_app/settings/")({
component: SettingsPage,
@@ -100,6 +104,8 @@ function SettingsPage() {
<SettingsGroup label={t("settings.integrations")} items={integrations} />
<SettingsGroup label={t("settings.socialGroup")} items={social} />
{isAdmin && <AdminActions />}
<button
onClick={handleLogout}
className="w-full rounded-xl bg-card p-3 text-sm font-medium text-red-400"
@@ -113,6 +119,37 @@ function SettingsPage() {
)
}
function AdminActions() {
const { t } = useTranslation()
const reindex = useMutation({
mutationFn: reindexSearch,
})
return (
<div>
<p className="mb-1.5 px-1 text-xs font-medium text-muted-foreground">
{t("settings.admin")}
</p>
<div className="divide-y divide-border rounded-xl bg-card">
<div className="flex items-center gap-3 p-3">
<span className="text-muted-foreground">
<RefreshCw className={`size-4 ${reindex.isPending ? "animate-spin" : ""}`} />
</span>
<div className="flex-1">
<p className="text-sm font-medium">{t("settings.rebuildSearch")}</p>
<p className="text-xs text-muted-foreground">
{reindex.isSuccess ? t("settings.rebuildSearchDone") : t("settings.rebuildSearchDesc")}
</p>
</div>
<Button variant="outline" size="sm" onClick={() => reindex.mutate()} disabled={reindex.isPending}>
{reindex.isPending ? t("common.generating") : t("common.run")}
</Button>
</div>
</div>
</div>
)
}
function SettingsGroup({
label,
items,

View File

@@ -15,12 +15,14 @@ import { Input } from "@/components/ui/input"
import { Label } from "@/components/ui/label"
import { Skeleton } from "@/components/ui/skeleton"
import { EmptyState } from "@/components/empty-state"
import { useIsAdmin } from "@/components/auth-provider"
import { useAuth, useIsAdmin } from "@/components/auth-provider"
import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "@/components/ui/select"
import {
useWrapUps,
useGenerateWrapUp,
useDeleteWrapUp,
} from "@/hooks/use-wrapup"
import { useUsers } from "@/hooks/use-users"
export const Route = createFileRoute("/_app/settings/wrapup")({
component: WrapupPage,
@@ -28,18 +30,22 @@ export const Route = createFileRoute("/_app/settings/wrapup")({
function WrapupPage() {
const { t } = useTranslation()
const { auth } = useAuth()
const isAdmin = useIsAdmin()
const { data, isPending } = useWrapUps()
const generate = useGenerateWrapUp()
const remove = useDeleteWrapUp()
const { data: usersData } = useUsers()
const [open, setOpen] = useState(false)
const [startDate, setStartDate] = useState("")
const [endDate, setEndDate] = useState("")
const [targetUserId, setTargetUserId] = useState<string>("self")
const handleGenerate = () => {
const user_id = targetUserId === "global" ? undefined : targetUserId === "self" ? auth?.user_id : targetUserId
generate.mutate(
{ start_date: startDate, end_date: endDate },
{ start_date: startDate, end_date: endDate, user_id },
{
onSuccess: () => {
setOpen(false)
@@ -81,7 +87,7 @@ function WrapupPage() {
{items.map((w) => (
<Card key={w.id} size="sm">
<CardContent className="flex items-center justify-between">
{w.status === "completed" ? (
{w.status === "Ready" ? (
<Link to="/wrapup/$id" params={{ id: w.id }} className="flex flex-1 items-center justify-between">
<div>
<p className="text-sm font-medium">{w.start_date} {w.end_date}</p>
@@ -133,6 +139,25 @@ function WrapupPage() {
onChange={(e) => setEndDate(e.target.value)}
/>
</div>
{isAdmin && usersData?.users && (
<div className="space-y-1.5">
<Label>{t("wrapup.generateFor")}</Label>
<Select value={targetUserId} onValueChange={setTargetUserId}>
<SelectTrigger>
<SelectValue />
</SelectTrigger>
<SelectContent>
<SelectItem value="self">{t("wrapup.forSelf")}</SelectItem>
<SelectItem value="global">{t("wrapup.forGlobal")}</SelectItem>
{usersData.users.map((u) => (
<SelectItem key={u.id} value={u.id}>
{u.display_name ?? u.username ?? u.email}
</SelectItem>
))}
</SelectContent>
</Select>
</div>
)}
<Button
onClick={handleGenerate}
disabled={generate.isPending || !startDate || !endDate}

View File

@@ -105,7 +105,7 @@ function OwnFollowingTab() {
<Button
variant="outline"
size="sm"
onClick={() => unfollowMutation.mutate({ handle: actor.handle })}
onClick={() => unfollowMutation.mutate({ actor_url: actor.url })}
disabled={unfollowMutation.isPending}
>
<UserMinus className="mr-1 size-3.5" />
@@ -217,6 +217,15 @@ function UserFollowersTab({ userId }: { userId: string }) {
)
}
function actorHandle(actor: RemoteActorDto): string {
try {
const host = new URL(actor.url).host
return `@${actor.handle}@${host}`
} catch {
return `@${actor.handle}`
}
}
function ActorCard({ actor, action }: { actor: RemoteActorDto; action?: React.ReactNode }) {
const initial = (actor.display_name || actor.handle)[0]?.toUpperCase() ?? "?"
@@ -228,7 +237,7 @@ function ActorCard({ actor, action }: { actor: RemoteActorDto; action?: React.Re
</Avatar>
<div className="min-w-0 flex-1">
<p className="truncate text-sm font-semibold">{actor.display_name || actor.handle}</p>
<p className="truncate text-xs text-muted-foreground">{actor.handle}</p>
<p className="truncate text-xs text-muted-foreground">{actorHandle(actor)}</p>
</div>
{action}
</CardContent>

View File

@@ -41,7 +41,7 @@ function UserProfilePage() {
<Button
size="sm"
variant="outline"
onClick={() => unfollowMutation.mutate({ handle: data.username })}
onClick={() => unfollowMutation.mutate({ actor_url: followingData?.actors.find((a) => a.handle === data.username)?.url ?? "" })}
disabled={unfollowMutation.isPending}
>
<UserCheck className="mr-1 size-3.5" />

View File

@@ -5,7 +5,8 @@ import { Badge } from "@/components/ui/badge"
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"
import { Skeleton } from "@/components/ui/skeleton"
import { RatingHistogram } from "@/components/rating-histogram"
import { posterUrl } from "@/lib/api/client"
import { posterUrl, tmdbProfileUrl } from "@/lib/api/client"
import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar"
import { useWrapUpReport } from "@/hooks/use-wrapup"
import type { MovieRef, PersonStat } from "@/lib/api/wrapup"
@@ -82,6 +83,7 @@ function WrapUpReportPage() {
title={t("wrapup.topActors")}
subtitle={t("wrapup.uniqueActors", { count: report.actor_diversity })}
items={report.top_actors.slice(0, 5)}
profilePaths={report.top_cast_profile_paths}
/>
)}
@@ -175,7 +177,7 @@ function WrapUpReportPage() {
)
}
function RankCard({ title, subtitle, items }: { title: string; subtitle: string; items: PersonStat[] }) {
function RankCard({ title, subtitle, items, profilePaths }: { title: string; subtitle: string; items: PersonStat[]; profilePaths?: string[] }) {
const { t } = useTranslation()
return (
<Card>
@@ -187,15 +189,38 @@ function RankCard({ title, subtitle, items }: { title: string; subtitle: string;
</CardHeader>
<CardContent>
<ol className="space-y-2">
{items.map((item, i) => (
<li key={item.name} className="flex items-center gap-3">
<span className="flex size-6 items-center justify-center rounded-full bg-muted text-xs font-bold">{i + 1}</span>
<div className="flex-1">
<p className="text-sm font-medium">{item.name}</p>
<p className="text-xs text-muted-foreground">{t("common.filmsAvg", { count: item.count, avg: item.avg_rating.toFixed(1) })}</p>
</div>
</li>
))}
{items.map((item, i) => {
const profilePath = profilePaths?.[i]
return (
<li key={item.name}>
{item.person_id ? (
<Link to="/people/$id" params={{ id: item.person_id }} className="flex items-center gap-3">
<span className="flex size-6 items-center justify-center rounded-full bg-muted text-xs font-bold">{i + 1}</span>
<Avatar className="size-8">
{profilePath && <AvatarImage src={tmdbProfileUrl(profilePath)} />}
<AvatarFallback className="text-xs">{item.name[0]}</AvatarFallback>
</Avatar>
<div className="flex-1">
<p className="text-sm font-medium">{item.name}</p>
<p className="text-xs text-muted-foreground">{t("common.filmsAvg", { count: item.count, avg: item.avg_rating.toFixed(1) })}</p>
</div>
</Link>
) : (
<div className="flex items-center gap-3">
<span className="flex size-6 items-center justify-center rounded-full bg-muted text-xs font-bold">{i + 1}</span>
<Avatar className="size-8">
{profilePath && <AvatarImage src={tmdbProfileUrl(profilePath)} />}
<AvatarFallback className="text-xs">{item.name[0]}</AvatarFallback>
</Avatar>
<div className="flex-1">
<p className="text-sm font-medium">{item.name}</p>
<p className="text-xs text-muted-foreground">{t("common.filmsAvg", { count: item.count, avg: item.avg_rating.toFixed(1) })}</p>
</div>
</div>
)}
</li>
)
})}
</ol>
</CardContent>
</Card>
@@ -204,7 +229,7 @@ function RankCard({ title, subtitle, items }: { title: string; subtitle: string;
function MovieHighlight({ label, movie, showRuntime }: { label: string; movie?: MovieRef; showRuntime?: boolean }) {
if (!movie) return null
return (
const content = (
<div className="overflow-hidden rounded-xl bg-muted">
{movie.poster_path && (
<div className="aspect-[2/3] w-full">
@@ -220,6 +245,10 @@ function MovieHighlight({ label, movie, showRuntime }: { label: string; movie?:
</div>
</div>
)
if (movie.movie_id) {
return <Link to="/movies/$id" params={{ id: movie.movie_id }}>{content}</Link>
}
return content
}
function ReportSkeleton() {