Compare commits

..

2 Commits

Author SHA1 Message Date
01c1082290 feat: SPA bug fixes, interactivity, federation badges, admin reindex
Some checks failed
CI / Check / Test (push) Failing after 10m55s
- fix wrapup status "completed" → "Ready"
- fix unfollow sending {handle} instead of {actor_url}
- fix missing post import in users.ts
- fix feed/activity cache not invalidated on review delete/log
- add person_id to cast/crew types, link to /people pages
- add movie_id to wrapup MovieRef, link highlights to /movies pages
- add wrapup actor profile images + clickable person links
- add federated review globe badge in feed and movie detail
- add fediverse handle (@user@instance) in follower/following cards
- add admin reindex search button in settings
- add wrapup user picker for admins
- add username/display_name to user summary type
- use tmdbProfileUrl for person search results
2026-06-04 14:43:41 +02:00
bd7dc648c4 feat: search reindex, worker improvements, person IDs, user display names
- add admin POST /api/v1/admin/reindex-search endpoint + event-driven handler
- backfill persons from movie_cast/movie_crew into persons table
- paginate person list_page/backfill_from_credits_batch to cap memory
- concurrent worker event dispatch with semaphore (max 8)
- graceful worker shutdown (drain in-flight tasks on SIGINT)
- always ack events, log handler errors as warnings (no infinite retry)
- NATS ack_wait 600s, AtomicBool guard against concurrent reindex
- add username/display_name to UserSummaryDto and users list
- add person_id to CastMemberDto/CrewMemberDto via get_movie_profile use case
- add movie_id to wrapup MovieRef, person_id to wrapup PersonStat
- thread tmdb_person_id through wrapup cast pipeline
- add is_federated to FeedEntryDto
- cap orphaned persons query with LIMIT 500
- add SPA link to classic site footer
2026-06-04 14:43:28 +02:00
54 changed files with 852 additions and 148 deletions

View File

@@ -86,6 +86,7 @@ pub enum EventPayload {
WrapUpCompleted { WrapUpCompleted {
wrapup_id: String, wrapup_id: String,
}, },
SearchReindexRequested,
} }
impl EventPayload { impl EventPayload {
@@ -107,6 +108,7 @@ impl EventPayload {
EventPayload::WatchEventIngested { .. } => "WatchEventIngested", EventPayload::WatchEventIngested { .. } => "WatchEventIngested",
EventPayload::WrapUpRequested { .. } => "WrapUpRequested", EventPayload::WrapUpRequested { .. } => "WrapUpRequested",
EventPayload::WrapUpCompleted { .. } => "WrapUpCompleted", EventPayload::WrapUpCompleted { .. } => "WrapUpCompleted",
EventPayload::SearchReindexRequested => "SearchReindexRequested",
} }
} }
} }
@@ -248,6 +250,7 @@ impl From<&DomainEvent> for EventPayload {
DomainEvent::WrapUpCompleted { wrapup_id } => EventPayload::WrapUpCompleted { DomainEvent::WrapUpCompleted { wrapup_id } => EventPayload::WrapUpCompleted {
wrapup_id: wrapup_id.value().to_string(), wrapup_id: wrapup_id.value().to_string(),
}, },
DomainEvent::SearchReindexRequested => EventPayload::SearchReindexRequested,
} }
} }
} }
@@ -398,6 +401,7 @@ impl TryFrom<EventPayload> for DomainEvent {
wrapup_id: WrapUpId::from_uuid(wid), wrapup_id: WrapUpId::from_uuid(wid),
}) })
} }
EventPayload::SearchReindexRequested => Ok(DomainEvent::SearchReindexRequested),
} }
} }
} }

View File

@@ -120,6 +120,7 @@ impl NatsJetStreamConsumer {
pull::Config { pull::Config {
durable_name: Some(cfg.consumer_name.clone()), durable_name: Some(cfg.consumer_name.clone()),
filter_subject: subject_filter, filter_subject: subject_filter,
ack_wait: std::time::Duration::from_secs(600),
..Default::default() ..Default::default()
}, },
) )

View File

@@ -18,6 +18,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String {
DomainEvent::WatchEventIngested { .. } => "watch.event.ingested", DomainEvent::WatchEventIngested { .. } => "watch.event.ingested",
DomainEvent::WrapUpRequested { .. } => "wrapup.requested", DomainEvent::WrapUpRequested { .. } => "wrapup.requested",
DomainEvent::WrapUpCompleted { .. } => "wrapup.completed", DomainEvent::WrapUpCompleted { .. } => "wrapup.completed",
DomainEvent::SearchReindexRequested => "search.reindex.requested",
}; };
format!("{prefix}.{suffix}") format!("{prefix}.{suffix}")
} }

View File

@@ -7,7 +7,7 @@ use domain::{
}, },
value_objects::{ value_objects::{
Comment, Email, ExternalMetadataId, MovieId, MovieTitle, PosterPath, Rating, ReleaseYear, Comment, Email, ExternalMetadataId, MovieId, MovieTitle, PosterPath, Rating, ReleaseYear,
ReviewId, UserId, ReviewId, UserId, Username,
}, },
}; };
use uuid::Uuid; use uuid::Uuid;
@@ -237,6 +237,8 @@ impl MovieStatsRow {
pub(crate) struct UserSummaryRow { pub(crate) struct UserSummaryRow {
pub id: String, pub id: String,
pub email: String, pub email: String,
pub username: String,
pub display_name: Option<String>,
pub total_movies: i64, pub total_movies: i64,
pub avg_rating: Option<f64>, pub avg_rating: Option<f64>,
pub avatar_path: Option<String>, pub avatar_path: Option<String>,
@@ -247,6 +249,8 @@ impl UserSummaryRow {
Ok(UserSummary::new( Ok(UserSummary::new(
UserId::from_uuid(parse_uuid(&self.id)?), UserId::from_uuid(parse_uuid(&self.id)?),
Email::new(self.email)?, Email::new(self.email)?,
Username::new(self.username)?,
self.display_name,
self.total_movies, self.total_movies,
self.avg_rating, self.avg_rating,
self.avatar_path, self.avatar_path,

View File

@@ -57,6 +57,60 @@ impl PersonCommand for PostgresPersonAdapter {
} }
Ok(()) Ok(())
} }
async fn backfill_from_credits_batch(
&self,
batch_size: u32,
) -> Result<(u64, bool), DomainError> {
#[derive(sqlx::FromRow)]
struct MissingPerson {
tmdb_person_id: i64,
name: String,
department: Option<String>,
profile_path: Option<String>,
}
let rows = sqlx::query_as::<_, MissingPerson>(
"SELECT mc.tmdb_person_id, mc.name, 'Acting' AS department, mc.profile_path
FROM movie_cast mc
WHERE NOT EXISTS (SELECT 1 FROM persons WHERE persons.tmdb_person_id = mc.tmdb_person_id)
GROUP BY mc.tmdb_person_id, mc.name, mc.profile_path
UNION ALL
SELECT mc.tmdb_person_id, mc.name, mc.department, mc.profile_path
FROM movie_crew mc
WHERE NOT EXISTS (SELECT 1 FROM persons WHERE persons.tmdb_person_id = mc.tmdb_person_id)
AND NOT EXISTS (SELECT 1 FROM movie_cast c2 WHERE c2.tmdb_person_id = mc.tmdb_person_id)
GROUP BY mc.tmdb_person_id, mc.name, mc.department, mc.profile_path
LIMIT $1",
)
.bind(batch_size as i64)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
let has_more = rows.len() as u32 >= batch_size;
let mut count = 0u64;
for row in &rows {
let ext = ExternalPersonId::new(format!("tmdb:{}", row.tmdb_person_id));
let pid = PersonId::from_external(&ext);
sqlx::query(
"INSERT INTO persons (id, external_id, tmdb_person_id, name, known_for_department, profile_path)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT(tmdb_person_id) DO NOTHING",
)
.bind(pid.value().to_string())
.bind(ext.value())
.bind(row.tmdb_person_id)
.bind(&row.name)
.bind(&row.department)
.bind(&row.profile_path)
.execute(&self.pool)
.await
.map_err(map_err)?;
count += 1;
}
Ok((count, has_more))
}
} }
#[async_trait] #[async_trait]
@@ -206,6 +260,40 @@ impl PersonQuery for PostgresPersonAdapter {
Ok(PersonCredits { person, cast, crew }) Ok(PersonCredits { person, cast, crew })
} }
async fn list_page(&self, limit: u32, offset: u32) -> Result<Vec<Person>, DomainError> {
#[derive(sqlx::FromRow)]
struct Row {
id: String,
external_id: String,
name: String,
known_for_department: Option<String>,
profile_path: Option<String>,
}
let rows = sqlx::query_as::<_, Row>(
"SELECT id, external_id, name, known_for_department, profile_path FROM persons ORDER BY id LIMIT $1 OFFSET $2",
)
.bind(limit as i64)
.bind(offset as i64)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
Ok(rows
.into_iter()
.map(|r| {
let ext = ExternalPersonId::new(r.external_id);
Person::new(
PersonId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()),
ext,
r.name,
r.known_for_department,
r.profile_path,
)
})
.collect())
}
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> { async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> {
let rows: Vec<(String,)> = sqlx::query_as( let rows: Vec<(String,)> = sqlx::query_as(
"SELECT id FROM persons "SELECT id FROM persons
@@ -214,7 +302,8 @@ impl PersonQuery for PostgresPersonAdapter {
) )
AND NOT EXISTS ( AND NOT EXISTS (
SELECT 1 FROM movie_crew WHERE movie_crew.tmdb_person_id = persons.tmdb_person_id SELECT 1 FROM movie_crew WHERE movie_crew.tmdb_person_id = persons.tmdb_person_id
)", )
LIMIT 500",
) )
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await

View File

@@ -186,13 +186,13 @@ impl UserRepository for PostgresUserRepository {
async fn list_with_stats(&self) -> Result<Vec<domain::models::UserSummary>, DomainError> { async fn list_with_stats(&self) -> Result<Vec<domain::models::UserSummary>, DomainError> {
sqlx::query_as::<_, UserSummaryRow>( sqlx::query_as::<_, UserSummaryRow>(
r#"SELECT u.id, u.email, r#"SELECT u.id, u.email, u.username, u.display_name,
COUNT(DISTINCT r.movie_id) AS total_movies, COUNT(DISTINCT r.movie_id) AS total_movies,
AVG(r.rating::float) AS avg_rating, AVG(r.rating::float) AS avg_rating,
u.avatar_path u.avatar_path
FROM users u FROM users u
LEFT JOIN reviews r ON r.user_id = u.id AND r.remote_actor_url IS NULL LEFT JOIN reviews r ON r.user_id = u.id AND r.remote_actor_url IS NULL
GROUP BY u.id, u.email, u.avatar_path GROUP BY u.id, u.email, u.username, u.display_name, u.avatar_path
ORDER BY u.email ASC"#, ORDER BY u.email ASC"#,
) )
.fetch_all(&self.pool) .fetch_all(&self.pool)

View File

@@ -333,9 +333,9 @@ impl WrapUpStatsQuery for PostgresWrapUpStatsQuery {
let keywords = keywords_map.get(&movie_id_str).cloned().unwrap_or_default(); let keywords = keywords_map.get(&movie_id_str).cloned().unwrap_or_default();
let cast = cast_map.get(&movie_id_str).cloned().unwrap_or_default(); let cast = cast_map.get(&movie_id_str).cloned().unwrap_or_default();
let cast_names: Vec<(String, u32)> = cast let cast_names: Vec<(String, u32, i64)> = cast
.iter() .iter()
.map(|c| (c.name.clone(), c.billing_order)) .map(|c| (c.name.clone(), c.billing_order, c.tmdb_person_id))
.collect(); .collect();
let cast_profile_paths: Vec<Option<String>> = let cast_profile_paths: Vec<Option<String>> =
cast.iter().map(|c| c.profile_path.clone()).collect(); cast.iter().map(|c| c.profile_path.clone()).collect();
@@ -367,6 +367,7 @@ impl WrapUpStatsQuery for PostgresWrapUpStatsQuery {
struct CastEntry { struct CastEntry {
name: String, name: String,
billing_order: u32, billing_order: u32,
tmdb_person_id: i64,
profile_path: Option<String>, profile_path: Option<String>,
} }
@@ -417,7 +418,7 @@ async fn fetch_cast_pg(
movie_ids: &[String], movie_ids: &[String],
) -> Result<HashMap<String, Vec<CastEntry>>, DomainError> { ) -> Result<HashMap<String, Vec<CastEntry>>, DomainError> {
let rows = sqlx::query( let rows = sqlx::query(
"SELECT movie_id, name, billing_order, profile_path \ "SELECT movie_id, name, billing_order, tmdb_person_id, profile_path \
FROM movie_cast \ FROM movie_cast \
WHERE movie_id = ANY($1) AND billing_order <= 3 \ WHERE movie_id = ANY($1) AND billing_order <= 3 \
ORDER BY billing_order ASC", ORDER BY billing_order ASC",
@@ -432,10 +433,12 @@ async fn fetch_cast_pg(
let mid: String = row.try_get("movie_id").map_err(map_err)?; let mid: String = row.try_get("movie_id").map_err(map_err)?;
let name: String = row.try_get("name").map_err(map_err)?; let name: String = row.try_get("name").map_err(map_err)?;
let billing_order: i32 = row.try_get("billing_order").map_err(map_err)?; let billing_order: i32 = row.try_get("billing_order").map_err(map_err)?;
let tmdb_person_id: i64 = row.try_get("tmdb_person_id").map_err(map_err)?;
let profile_path: Option<String> = row.try_get("profile_path").map_err(map_err)?; let profile_path: Option<String> = row.try_get("profile_path").map_err(map_err)?;
map.entry(mid).or_default().push(CastEntry { map.entry(mid).or_default().push(CastEntry {
name, name,
billing_order: billing_order as u32, billing_order: billing_order as u32,
tmdb_person_id,
profile_path, profile_path,
}); });
} }

View File

@@ -7,7 +7,7 @@ use domain::{
}, },
value_objects::{ value_objects::{
Comment, Email, ExternalMetadataId, MovieId, MovieTitle, PosterPath, Rating, ReleaseYear, Comment, Email, ExternalMetadataId, MovieId, MovieTitle, PosterPath, Rating, ReleaseYear,
ReviewId, UserId, WatchlistEntryId, ReviewId, UserId, Username, WatchlistEntryId,
}, },
}; };
use uuid::Uuid; use uuid::Uuid;
@@ -245,6 +245,8 @@ impl FeedRow {
pub(crate) struct UserSummaryRow { pub(crate) struct UserSummaryRow {
pub id: String, pub id: String,
pub email: String, pub email: String,
pub username: String,
pub display_name: Option<String>,
pub total_movies: i64, pub total_movies: i64,
pub avg_rating: Option<f64>, pub avg_rating: Option<f64>,
pub avatar_path: Option<String>, pub avatar_path: Option<String>,
@@ -255,6 +257,8 @@ impl UserSummaryRow {
Ok(UserSummary::new( Ok(UserSummary::new(
UserId::from_uuid(parse_uuid(&self.id)?), UserId::from_uuid(parse_uuid(&self.id)?),
Email::new(self.email)?, Email::new(self.email)?,
Username::new(self.username)?,
self.display_name,
self.total_movies, self.total_movies,
self.avg_rating, self.avg_rating,
self.avatar_path, self.avatar_path,

View File

@@ -57,6 +57,60 @@ impl PersonCommand for SqlitePersonAdapter {
} }
Ok(()) Ok(())
} }
async fn backfill_from_credits_batch(
&self,
batch_size: u32,
) -> Result<(u64, bool), DomainError> {
#[derive(sqlx::FromRow)]
struct MissingPerson {
tmdb_person_id: i64,
name: String,
department: Option<String>,
profile_path: Option<String>,
}
let rows = sqlx::query_as::<_, MissingPerson>(
"SELECT mc.tmdb_person_id, mc.name, 'Acting' AS department, mc.profile_path
FROM movie_cast mc
WHERE NOT EXISTS (SELECT 1 FROM persons WHERE persons.tmdb_person_id = mc.tmdb_person_id)
GROUP BY mc.tmdb_person_id
UNION ALL
SELECT mc.tmdb_person_id, mc.name, mc.department, mc.profile_path
FROM movie_crew mc
WHERE NOT EXISTS (SELECT 1 FROM persons WHERE persons.tmdb_person_id = mc.tmdb_person_id)
AND NOT EXISTS (SELECT 1 FROM movie_cast c2 WHERE c2.tmdb_person_id = mc.tmdb_person_id)
GROUP BY mc.tmdb_person_id
LIMIT ?",
)
.bind(batch_size)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
let has_more = rows.len() as u32 >= batch_size;
let mut count = 0u64;
for row in &rows {
let ext = ExternalPersonId::new(format!("tmdb:{}", row.tmdb_person_id));
let pid = PersonId::from_external(&ext);
sqlx::query(
"INSERT INTO persons (id, external_id, tmdb_person_id, name, known_for_department, profile_path)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(tmdb_person_id) DO NOTHING",
)
.bind(pid.value().to_string())
.bind(ext.value())
.bind(row.tmdb_person_id)
.bind(&row.name)
.bind(&row.department)
.bind(&row.profile_path)
.execute(&self.pool)
.await
.map_err(map_err)?;
count += 1;
}
Ok((count, has_more))
}
} }
#[async_trait] #[async_trait]
@@ -156,6 +210,19 @@ impl PersonQuery for SqlitePersonAdapter {
Ok(PersonCredits { person, cast, crew }) Ok(PersonCredits { person, cast, crew })
} }
async fn list_page(&self, limit: u32, offset: u32) -> Result<Vec<Person>, DomainError> {
let rows = sqlx::query_as::<_, PersonRow>(
"SELECT id, external_id, name, known_for_department, profile_path FROM persons ORDER BY id LIMIT ? OFFSET ?",
)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
Ok(rows.into_iter().map(PersonRow::into_person).collect())
}
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> { async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> {
let rows: Vec<(String,)> = sqlx::query_as( let rows: Vec<(String,)> = sqlx::query_as(
"SELECT id FROM persons "SELECT id FROM persons
@@ -164,7 +231,8 @@ impl PersonQuery for SqlitePersonAdapter {
) )
AND NOT EXISTS ( AND NOT EXISTS (
SELECT 1 FROM movie_crew WHERE movie_crew.tmdb_person_id = persons.tmdb_person_id SELECT 1 FROM movie_crew WHERE movie_crew.tmdb_person_id = persons.tmdb_person_id
)", )
LIMIT 500",
) )
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await

View File

@@ -182,17 +182,15 @@ impl UserRepository for SqliteUserRepository {
} }
async fn list_with_stats(&self) -> Result<Vec<domain::models::UserSummary>, DomainError> { async fn list_with_stats(&self) -> Result<Vec<domain::models::UserSummary>, DomainError> {
sqlx::query_as!( sqlx::query_as::<_, UserSummaryRow>(
UserSummaryRow, r#"SELECT u.id, u.email, u.username, u.display_name,
r#"SELECT u.id AS "id!: String", COUNT(DISTINCT r.movie_id) AS total_movies,
u.email AS "email!: String",
COUNT(DISTINCT r.movie_id) AS "total_movies!: i64",
AVG(CAST(r.rating AS REAL)) AS avg_rating, AVG(CAST(r.rating AS REAL)) AS avg_rating,
u.avatar_path u.avatar_path
FROM users u FROM users u
LEFT JOIN reviews r ON r.user_id = u.id AND r.remote_actor_url IS NULL LEFT JOIN reviews r ON r.user_id = u.id AND r.remote_actor_url IS NULL
GROUP BY u.id, u.email, u.avatar_path GROUP BY u.id, u.email, u.username, u.display_name, u.avatar_path
ORDER BY u.email ASC"# ORDER BY u.email ASC"#,
) )
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await

View File

@@ -345,9 +345,9 @@ impl WrapUpStatsQuery for SqliteWrapUpStatsQuery {
let keywords = keywords_map.get(&movie_id_str).cloned().unwrap_or_default(); let keywords = keywords_map.get(&movie_id_str).cloned().unwrap_or_default();
let cast = cast_map.get(&movie_id_str).cloned().unwrap_or_default(); let cast = cast_map.get(&movie_id_str).cloned().unwrap_or_default();
let cast_names: Vec<(String, u32)> = cast let cast_names: Vec<(String, u32, i64)> = cast
.iter() .iter()
.map(|c| (c.name.clone(), c.billing_order)) .map(|c| (c.name.clone(), c.billing_order, c.tmdb_person_id))
.collect(); .collect();
let cast_profile_paths: Vec<Option<String>> = let cast_profile_paths: Vec<Option<String>> =
cast.iter().map(|c| c.profile_path.clone()).collect(); cast.iter().map(|c| c.profile_path.clone()).collect();
@@ -379,6 +379,7 @@ impl WrapUpStatsQuery for SqliteWrapUpStatsQuery {
struct CastEntry { struct CastEntry {
name: String, name: String,
billing_order: u32, billing_order: u32,
tmdb_person_id: i64,
profile_path: Option<String>, profile_path: Option<String>,
} }
@@ -453,7 +454,7 @@ async fn fetch_cast_sqlite(
return Ok(HashMap::new()); return Ok(HashMap::new());
} }
let sql = format!( let sql = format!(
"SELECT movie_id, name, billing_order, profile_path \ "SELECT movie_id, name, billing_order, tmdb_person_id, profile_path \
FROM movie_cast \ FROM movie_cast \
WHERE movie_id IN ({}) AND billing_order <= 3 \ WHERE movie_id IN ({}) AND billing_order <= 3 \
ORDER BY billing_order ASC", ORDER BY billing_order ASC",
@@ -470,10 +471,12 @@ async fn fetch_cast_sqlite(
let mid: String = row.try_get("movie_id").map_err(map_err)?; let mid: String = row.try_get("movie_id").map_err(map_err)?;
let name: String = row.try_get("name").map_err(map_err)?; let name: String = row.try_get("name").map_err(map_err)?;
let billing_order: i32 = row.try_get("billing_order").map_err(map_err)?; let billing_order: i32 = row.try_get("billing_order").map_err(map_err)?;
let tmdb_person_id: i64 = row.try_get("tmdb_person_id").map_err(map_err)?;
let profile_path: Option<String> = row.try_get("profile_path").map_err(map_err)?; let profile_path: Option<String> = row.try_get("profile_path").map_err(map_err)?;
map.entry(mid).or_default().push(CastEntry { map.entry(mid).or_default().push(CastEntry {
name, name,
billing_order: billing_order as u32, billing_order: billing_order as u32,
tmdb_person_id,
profile_path, profile_path,
}); });
} }

View File

@@ -63,6 +63,8 @@
{% endif %} {% endif %}
<span class="footer-sep">·</span> <span class="footer-sep">·</span>
<a href="/docs" target="_blank" class="footer-link">API Docs</a> <a href="/docs" target="_blank" class="footer-link">API Docs</a>
<span class="footer-sep">·</span>
<a href="/app/" class="footer-link">Mobile App</a>
</footer> </footer>
</body> </body>
</html> </html>

View File

@@ -57,6 +57,7 @@ pub struct FeedEntryDto {
pub user_id: Uuid, pub user_id: Uuid,
pub user_email: String, pub user_email: String,
pub user_display_name: String, pub user_display_name: String,
pub is_federated: bool,
} }
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]

View File

@@ -40,6 +40,7 @@ pub struct KeywordDto {
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct CastMemberDto { pub struct CastMemberDto {
pub person_id: String,
pub tmdb_person_id: u64, pub tmdb_person_id: u64,
pub name: String, pub name: String,
pub character: String, pub character: String,
@@ -49,6 +50,7 @@ pub struct CastMemberDto {
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct CrewMemberDto { pub struct CrewMemberDto {
pub person_id: String,
pub tmdb_person_id: u64, pub tmdb_person_id: u64,
pub name: String, pub name: String,
pub job: String, pub job: String,

View File

@@ -7,6 +7,8 @@ use crate::diary::{DiaryEntryDto, DiaryResponse};
pub struct UserSummaryDto { pub struct UserSummaryDto {
pub id: Uuid, pub id: Uuid,
pub email: String, pub email: String,
pub username: String,
pub display_name: Option<String>,
pub total_movies: i64, pub total_movies: i64,
pub avg_rating: Option<f64>, pub avg_rating: Option<f64>,
} }

View File

@@ -20,3 +20,4 @@ pub mod test_helpers;
pub use movies::MovieDiscoveryIndexer; pub use movies::MovieDiscoveryIndexer;
pub use movies::SearchCleanupHandler; pub use movies::SearchCleanupHandler;
pub use movies::SearchReindexHandler;

View File

@@ -0,0 +1,78 @@
use domain::{
errors::DomainError,
models::{CastMember, CrewMember, ExternalPersonId, MovieProfile, PersonId},
value_objects::MovieId,
};
use uuid::Uuid;
use crate::context::AppContext;
pub struct GetMovieProfileQuery {
pub movie_id: Uuid,
}
pub struct CastMemberWithId {
pub person_id: PersonId,
pub tmdb_person_id: u64,
pub name: String,
pub character: String,
pub billing_order: u32,
pub profile_path: Option<String>,
}
pub struct CrewMemberWithId {
pub person_id: PersonId,
pub tmdb_person_id: u64,
pub name: String,
pub job: String,
pub department: String,
pub profile_path: Option<String>,
}
pub struct MovieProfileResult {
pub profile: MovieProfile,
pub cast: Vec<CastMemberWithId>,
pub crew: Vec<CrewMemberWithId>,
}
fn resolve_cast(member: &CastMember) -> CastMemberWithId {
let ext = ExternalPersonId::new(format!("tmdb:{}", member.tmdb_person_id));
CastMemberWithId {
person_id: PersonId::from_external(&ext),
tmdb_person_id: member.tmdb_person_id,
name: member.name.clone(),
character: member.character.clone(),
billing_order: member.billing_order,
profile_path: member.profile_path.clone(),
}
}
fn resolve_crew(member: &CrewMember) -> CrewMemberWithId {
let ext = ExternalPersonId::new(format!("tmdb:{}", member.tmdb_person_id));
CrewMemberWithId {
person_id: PersonId::from_external(&ext),
tmdb_person_id: member.tmdb_person_id,
name: member.name.clone(),
job: member.job.clone(),
department: member.department.clone(),
profile_path: member.profile_path.clone(),
}
}
pub async fn execute(
ctx: &AppContext,
query: GetMovieProfileQuery,
) -> Result<Option<MovieProfileResult>, DomainError> {
let movie_id = MovieId::from_uuid(query.movie_id);
let profile = ctx.repos.movie_profile.get_by_movie_id(&movie_id).await?;
Ok(profile.map(|p| {
let cast = p.cast.iter().map(resolve_cast).collect();
let crew = p.crew.iter().map(resolve_crew).collect();
MovieProfileResult {
profile: p,
cast,
crew,
}
}))
}

View File

@@ -1,10 +1,13 @@
pub mod commands; pub mod commands;
pub mod discovery_indexer; pub mod discovery_indexer;
pub mod enrich_movie; pub mod enrich_movie;
pub mod get_movie_profile;
pub mod get_movies; pub mod get_movies;
pub mod queries; pub mod queries;
pub mod reindex_search;
pub mod search_cleanup; pub mod search_cleanup;
pub mod sync_poster; pub mod sync_poster;
pub use discovery_indexer::MovieDiscoveryIndexer; pub use discovery_indexer::MovieDiscoveryIndexer;
pub use reindex_search::SearchReindexHandler;
pub use search_cleanup::SearchCleanupHandler; pub use search_cleanup::SearchCleanupHandler;

View File

@@ -0,0 +1,165 @@
use async_trait::async_trait;
use domain::{
errors::DomainError,
events::DomainEvent,
models::{IndexableDocument, MovieFilter, collections::PageParams},
ports::EventHandler,
};
use std::sync::atomic::{AtomicBool, Ordering};
use crate::context::AppContext;
const BATCH_SIZE: u32 = 500;
pub struct SearchReindexHandler {
ctx: AppContext,
running: AtomicBool,
}
impl SearchReindexHandler {
pub fn new(ctx: AppContext) -> Self {
Self {
ctx,
running: AtomicBool::new(false),
}
}
}
#[async_trait]
impl EventHandler for SearchReindexHandler {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
if !matches!(event, DomainEvent::SearchReindexRequested) {
return Ok(());
}
if self.running.swap(true, Ordering::SeqCst) {
tracing::info!("search reindex already running, skipping");
return Ok(());
}
let result = self.run_reindex().await;
self.running.store(false, Ordering::SeqCst);
result
}
}
impl SearchReindexHandler {
async fn run_reindex(&self) -> Result<(), DomainError> {
tracing::info!("search reindex started");
let movies_indexed = self.reindex_movies().await?;
let backfilled = self.backfill_persons().await?;
if backfilled > 0 {
tracing::info!(backfilled, "backfilled missing persons from credits");
}
let persons_indexed = self.reindex_persons().await?;
tracing::info!(movies_indexed, persons_indexed, "search reindex completed");
Ok(())
}
async fn reindex_movies(&self) -> Result<u64, DomainError> {
let mut count: u64 = 0;
let mut offset: u32 = 0;
loop {
let page = self
.ctx
.repos
.movie
.list_movies(
&PageParams {
limit: BATCH_SIZE,
offset,
},
&MovieFilter::default(),
)
.await?;
for summary in &page.items {
let movie_id = summary.movie.id().clone();
let profile = self
.ctx
.repos
.movie_profile
.get_by_movie_id(&movie_id)
.await?;
if let Err(e) = self
.ctx
.repos
.search_command
.index(IndexableDocument::Movie {
id: movie_id.clone(),
movie: Box::new(summary.movie.clone()),
profile: profile.map(Box::new),
})
.await
{
tracing::warn!(movie_id = %movie_id.value(), "reindex movie failed: {e}");
}
count += 1;
}
if (page.items.len() as u32) < BATCH_SIZE {
break;
}
offset += BATCH_SIZE;
tokio::task::yield_now().await;
}
Ok(count)
}
async fn backfill_persons(&self) -> Result<u64, DomainError> {
let mut total = 0u64;
loop {
let (count, has_more) = self
.ctx
.repos
.person_command
.backfill_from_credits_batch(BATCH_SIZE)
.await?;
total += count;
if !has_more {
break;
}
tokio::task::yield_now().await;
}
Ok(total)
}
async fn reindex_persons(&self) -> Result<u64, DomainError> {
let mut count: u64 = 0;
let mut offset: u32 = 0;
loop {
let persons = self
.ctx
.repos
.person_query
.list_page(BATCH_SIZE, offset)
.await?;
for person in &persons {
if let Err(e) = self
.ctx
.repos
.search_command
.index(IndexableDocument::Person {
id: person.id().clone(),
person: Box::new(person.clone()),
})
.await
{
tracing::warn!(person = %person.name(), "reindex person failed: {e}");
}
count += 1;
}
if (persons.len() as u32) < BATCH_SIZE {
break;
}
offset += BATCH_SIZE;
tokio::task::yield_now().await;
}
Ok(count)
}
}

View File

@@ -61,6 +61,7 @@ impl EventHandler for RecordingHandler {
DomainEvent::WatchEventIngested { .. } => "watch_event_ingested", DomainEvent::WatchEventIngested { .. } => "watch_event_ingested",
DomainEvent::WrapUpRequested { .. } => "wrapup_requested", DomainEvent::WrapUpRequested { .. } => "wrapup_requested",
DomainEvent::WrapUpCompleted { .. } => "wrapup_completed", DomainEvent::WrapUpCompleted { .. } => "wrapup_completed",
DomainEvent::SearchReindexRequested => "search_reindex",
}; };
self.calls.lock().unwrap().push(label); self.calls.lock().unwrap().push(label);
Ok(()) Ok(())
@@ -85,34 +86,34 @@ async fn dispatches_to_all_handlers() {
}; };
WorkerService::new(Arc::new(consumer), vec![Arc::new(handler)]) WorkerService::new(Arc::new(consumer), vec![Arc::new(handler)])
.run() .run(tokio::sync::watch::channel(false).1)
.await; .await;
assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]); assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]);
} }
#[tokio::test] #[tokio::test]
async fn nacks_when_handler_fails() { async fn acks_even_when_handler_fails() {
let nack_called = Arc::new(Mutex::new(false)); let ack_called = Arc::new(Mutex::new(false));
struct TrackingAck { struct TrackingAck {
nack_called: Arc<Mutex<bool>>, ack_called: Arc<Mutex<bool>>,
} }
#[async_trait] #[async_trait]
impl AckHandle for TrackingAck { impl AckHandle for TrackingAck {
async fn ack(&self) -> Result<(), DomainError> { async fn ack(&self) -> Result<(), DomainError> {
*self.ack_called.lock().unwrap() = true;
Ok(()) Ok(())
} }
async fn nack(&self) -> Result<(), DomainError> { async fn nack(&self) -> Result<(), DomainError> {
*self.nack_called.lock().unwrap() = true;
Ok(()) Ok(())
} }
} }
struct TrackingConsumer { struct TrackingConsumer {
event: DomainEvent, event: DomainEvent,
nack_called: Arc<Mutex<bool>>, ack_called: Arc<Mutex<bool>>,
} }
impl EventConsumer for TrackingConsumer { impl EventConsumer for TrackingConsumer {
@@ -120,7 +121,7 @@ async fn nacks_when_handler_fails() {
let envelope = EventEnvelope::new( let envelope = EventEnvelope::new(
self.event.clone(), self.event.clone(),
Box::new(TrackingAck { Box::new(TrackingAck {
nack_called: Arc::clone(&self.nack_called), ack_called: Arc::clone(&self.ack_called),
}), }),
); );
Box::pin(stream::iter(vec![Ok(envelope)])) Box::pin(stream::iter(vec![Ok(envelope)]))
@@ -138,14 +139,14 @@ async fn nacks_when_handler_fails() {
let consumer = TrackingConsumer { let consumer = TrackingConsumer {
event: movie_discovered(), event: movie_discovered(),
nack_called: Arc::clone(&nack_called), ack_called: Arc::clone(&ack_called),
}; };
WorkerService::new(Arc::new(consumer), vec![Arc::new(FailingHandler)]) WorkerService::new(Arc::new(consumer), vec![Arc::new(FailingHandler)])
.run() .run(tokio::sync::watch::channel(false).1)
.await; .await;
assert!(*nack_called.lock().unwrap()); assert!(*ack_called.lock().unwrap());
} }
#[tokio::test] #[tokio::test]
@@ -189,7 +190,9 @@ async fn acks_when_all_handlers_succeed() {
ack_called: Arc::clone(&ack_called), ack_called: Arc::clone(&ack_called),
}; };
WorkerService::new(Arc::new(consumer), vec![]).run().await; WorkerService::new(Arc::new(consumer), vec![])
.run(tokio::sync::watch::channel(false).1)
.await;
assert!(*ack_called.lock().unwrap()); assert!(*ack_called.lock().unwrap());
} }

View File

@@ -5,47 +5,73 @@ use domain::{
ports::{EventConsumer, EventHandler}, ports::{EventConsumer, EventHandler},
}; };
use futures::StreamExt; use futures::StreamExt;
use tokio::sync::Semaphore;
const DEFAULT_CONCURRENCY: usize = 8;
pub struct WorkerService { pub struct WorkerService {
consumer: Arc<dyn EventConsumer>, consumer: Arc<dyn EventConsumer>,
handlers: Vec<Arc<dyn EventHandler>>, handlers: Vec<Arc<dyn EventHandler>>,
semaphore: Arc<Semaphore>,
} }
impl WorkerService { impl WorkerService {
pub fn new(consumer: Arc<dyn EventConsumer>, handlers: Vec<Arc<dyn EventHandler>>) -> Self { pub fn new(consumer: Arc<dyn EventConsumer>, handlers: Vec<Arc<dyn EventHandler>>) -> Self {
Self { consumer, handlers } Self {
consumer,
handlers,
semaphore: Arc::new(Semaphore::new(DEFAULT_CONCURRENCY)),
}
} }
pub async fn run(self) { pub async fn run(self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
let handlers = Arc::new(self.handlers);
let mut tasks = tokio::task::JoinSet::new();
let mut stream = self.consumer.consume(); let mut stream = self.consumer.consume();
while let Some(result) = stream.next().await {
match result {
Ok(envelope) => {
tracing::info!(event = ?envelope.event, "received event");
self.dispatch(envelope).await;
}
Err(e) => tracing::error!("event consumer error: {e}"),
}
}
tracing::info!("event stream ended, worker shutting down");
}
async fn dispatch(&self, envelope: EventEnvelope) { loop {
let mut all_ok = true; tokio::select! {
for handler in &self.handlers { biased;
if let Err(e) = handler.handle(&envelope.event).await { _ = shutdown.changed() => {
tracing::error!("event handler error: {e}"); tracing::info!("shutdown signal received, stopping event consumption");
all_ok = false; break;
}
item = stream.next() => {
match item {
Some(Ok(envelope)) => {
tracing::info!(event = ?envelope.event, "received event");
let permit = self.semaphore.clone().acquire_owned().await;
let Ok(permit) = permit else { break };
let h = Arc::clone(&handlers);
tasks.spawn(async move {
dispatch(h, envelope).await;
drop(permit);
});
}
Some(Err(e)) => tracing::error!("event consumer error: {e}"),
None => break,
}
}
} }
} }
let result = if all_ok {
envelope.ack().await let in_flight = tasks.len();
} else { if in_flight > 0 {
envelope.nack().await tracing::info!(in_flight, "draining in-flight tasks before shutdown");
};
if let Err(e) = result {
tracing::error!("ack/nack failed: {e}");
} }
while tasks.join_next().await.is_some() {}
tracing::info!("worker shut down gracefully");
}
}
async fn dispatch(handlers: Arc<Vec<Arc<dyn EventHandler>>>, envelope: EventEnvelope) {
for handler in handlers.iter() {
if let Err(e) = handler.handle(&envelope.event).await {
tracing::warn!("event handler error (non-fatal): {e}");
}
}
if let Err(e) = envelope.ack().await {
tracing::error!("ack failed: {e}");
} }
} }

View File

@@ -125,6 +125,7 @@ fn build_report(
fn movie_ref(r: &WrapUpMovieRow) -> MovieRef { fn movie_ref(r: &WrapUpMovieRow) -> MovieRef {
MovieRef { MovieRef {
movie_id: Some(r.movie_id),
title: r.title.clone(), title: r.title.clone(),
year: r.release_year, year: r.release_year,
runtime_minutes: r.runtime_minutes, runtime_minutes: r.runtime_minutes,
@@ -233,6 +234,7 @@ fn compute_director_stats(rows: &[WrapUpMovieRow]) -> (Vec<PersonStat>, u32) {
let count = ratings.len() as u32; let count = ratings.len() as u32;
let avg = ratings.iter().map(|&r| r as f64).sum::<f64>() / ratings.len() as f64; let avg = ratings.iter().map(|&r| r as f64).sum::<f64>() / ratings.len() as f64;
PersonStat { PersonStat {
person_id: None,
name, name,
count, count,
avg_rating: avg, avg_rating: avg,
@@ -249,12 +251,16 @@ fn compute_director_stats(rows: &[WrapUpMovieRow]) -> (Vec<PersonStat>, u32) {
} }
fn compute_actor_stats(rows: &[WrapUpMovieRow]) -> (Vec<PersonStat>, u32, Vec<String>) { fn compute_actor_stats(rows: &[WrapUpMovieRow]) -> (Vec<PersonStat>, u32, Vec<String>) {
use domain::models::{ExternalPersonId, PersonId};
let mut actor_movies: HashMap<String, Vec<u8>> = HashMap::new(); let mut actor_movies: HashMap<String, Vec<u8>> = HashMap::new();
let mut actor_profiles: HashMap<String, Option<String>> = HashMap::new(); let mut actor_profiles: HashMap<String, Option<String>> = HashMap::new();
let mut actor_tmdb_ids: HashMap<String, i64> = HashMap::new();
for r in rows { for r in rows {
for (i, (name, billing)) in r.cast_names.iter().enumerate() { for (i, (name, billing, tmdb_id)) in r.cast_names.iter().enumerate() {
if *billing <= 3 { if *billing <= 3 {
actor_movies.entry(name.clone()).or_default().push(r.rating); actor_movies.entry(name.clone()).or_default().push(r.rating);
actor_tmdb_ids.entry(name.clone()).or_insert(*tmdb_id);
if let Some(path) = r.cast_profile_paths.get(i) { if let Some(path) = r.cast_profile_paths.get(i) {
actor_profiles actor_profiles
.entry(name.clone()) .entry(name.clone())
@@ -269,7 +275,12 @@ fn compute_actor_stats(rows: &[WrapUpMovieRow]) -> (Vec<PersonStat>, u32, Vec<St
.map(|(name, ratings)| { .map(|(name, ratings)| {
let count = ratings.len() as u32; let count = ratings.len() as u32;
let avg = ratings.iter().map(|&r| r as f64).sum::<f64>() / ratings.len() as f64; let avg = ratings.iter().map(|&r| r as f64).sum::<f64>() / ratings.len() as f64;
let person_id = actor_tmdb_ids.get(&name).map(|tid| {
let ext = ExternalPersonId::new(format!("tmdb:{tid}"));
PersonId::from_external(&ext).value()
});
PersonStat { PersonStat {
person_id,
name, name,
count, count,
avg_rating: avg, avg_rating: avg,

View File

@@ -26,7 +26,7 @@ fn make_row(title: &str, rating: u8, watched_at: &str) -> WrapUpMovieRow {
original_language: Some("en".to_string()), original_language: Some("en".to_string()),
genres: vec!["Action".to_string()], genres: vec!["Action".to_string()],
keywords: vec!["heist".to_string()], keywords: vec!["heist".to_string()],
cast_names: vec![("Actor A".to_string(), 1)], cast_names: vec![("Actor A".to_string(), 1, 12345)],
cast_profile_paths: vec![None], cast_profile_paths: vec![None],
} }
} }

View File

@@ -84,6 +84,7 @@ pub enum DomainEvent {
WrapUpCompleted { WrapUpCompleted {
wrapup_id: WrapUpId, wrapup_id: WrapUpId,
}, },
SearchReindexRequested,
} }
#[async_trait] #[async_trait]

View File

@@ -461,6 +461,8 @@ impl FeedEntry {
pub struct UserSummary { pub struct UserSummary {
pub user_id: UserId, pub user_id: UserId,
email: Email, email: Email,
username: Username,
display_name: Option<String>,
pub total_movies: i64, pub total_movies: i64,
pub avg_rating: Option<f64>, pub avg_rating: Option<f64>,
pub avatar_path: Option<String>, pub avatar_path: Option<String>,
@@ -470,6 +472,8 @@ impl UserSummary {
pub fn new( pub fn new(
user_id: UserId, user_id: UserId,
email: Email, email: Email,
username: Username,
display_name: Option<String>,
total_movies: i64, total_movies: i64,
avg_rating: Option<f64>, avg_rating: Option<f64>,
avatar_path: Option<String>, avatar_path: Option<String>,
@@ -477,6 +481,8 @@ impl UserSummary {
Self { Self {
user_id, user_id,
email, email,
username,
display_name,
total_movies, total_movies,
avg_rating, avg_rating,
avatar_path, avatar_path,
@@ -485,6 +491,12 @@ impl UserSummary {
pub fn email(&self) -> &str { pub fn email(&self) -> &str {
self.email.value() self.email.value()
} }
pub fn username(&self) -> &str {
self.username.value()
}
pub fn display_name(&self) -> Option<&str> {
self.display_name.as_deref()
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]

View File

@@ -36,6 +36,7 @@ impl DateRange {
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MovieRef { pub struct MovieRef {
pub movie_id: Option<Uuid>,
pub title: String, pub title: String,
pub year: u16, pub year: u16,
pub runtime_minutes: Option<u32>, pub runtime_minutes: Option<u32>,
@@ -50,6 +51,7 @@ pub struct UserRef {
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PersonStat { pub struct PersonStat {
pub person_id: Option<Uuid>,
pub name: String, pub name: String,
pub count: u32, pub count: u32,
pub avg_rating: f64, pub avg_rating: f64,

View File

@@ -332,6 +332,12 @@ pub trait ImageRefQuery: Send + Sync {
pub trait PersonCommand: Send + Sync { pub trait PersonCommand: Send + Sync {
/// Upsert a batch of persons. Uses INSERT OR REPLACE (SQLite) / ON CONFLICT DO UPDATE (Postgres). /// Upsert a batch of persons. Uses INSERT OR REPLACE (SQLite) / ON CONFLICT DO UPDATE (Postgres).
async fn upsert_batch(&self, persons: &[Person]) -> Result<(), DomainError>; async fn upsert_batch(&self, persons: &[Person]) -> Result<(), DomainError>;
/// Insert a batch of missing persons from movie_cast/movie_crew into the persons table.
/// Returns (inserted_count, has_more).
async fn backfill_from_credits_batch(
&self,
batch_size: u32,
) -> Result<(u64, bool), DomainError>;
} }
/// Read port — queries persons and credits. No mutations. /// Read port — queries persons and credits. No mutations.
@@ -347,6 +353,7 @@ pub trait PersonQuery: Send + Sync {
/// Returns persons who have no remaining entries in movie_cast or movie_crew. /// Returns persons who have no remaining entries in movie_cast or movie_crew.
/// Called after movie deletion to find index entries that can be pruned. /// Called after movie deletion to find index entries that can be pruned.
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError>; async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError>;
async fn list_page(&self, limit: u32, offset: u32) -> Result<Vec<Person>, DomainError>;
} }
/// Read port — executes search queries. No mutations. /// Read port — executes search queries. No mutations.
@@ -519,7 +526,7 @@ pub struct WrapUpMovieRow {
pub original_language: Option<String>, pub original_language: Option<String>,
pub genres: Vec<String>, pub genres: Vec<String>,
pub keywords: Vec<String>, pub keywords: Vec<String>,
pub cast_names: Vec<(String, u32)>, pub cast_names: Vec<(String, u32, i64)>,
pub cast_profile_paths: Vec<Option<String>>, pub cast_profile_paths: Vec<Option<String>>,
} }

View File

@@ -672,6 +672,12 @@ impl PersonCommand for PanicPersonCommand {
async fn upsert_batch(&self, _persons: &[Person]) -> Result<(), DomainError> { async fn upsert_batch(&self, _persons: &[Person]) -> Result<(), DomainError> {
panic!("PanicPersonCommand called") panic!("PanicPersonCommand called")
} }
async fn backfill_from_credits_batch(
&self,
_batch_size: u32,
) -> Result<(u64, bool), DomainError> {
panic!("PanicPersonCommand called")
}
} }
// ── PanicPersonQuery ────────────────────────────────────────────────────────── // ── PanicPersonQuery ──────────────────────────────────────────────────────────
@@ -698,6 +704,10 @@ impl PersonQuery for PanicPersonQuery {
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> { async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> {
panic!("PanicPersonQuery called") panic!("PanicPersonQuery called")
} }
async fn list_page(&self, _limit: u32, _offset: u32) -> Result<Vec<Person>, DomainError> {
panic!("PanicPersonQuery called")
}
} }
// ── PanicSearchPort ─────────────────────────────────────────────────────────── // ── PanicSearchPort ───────────────────────────────────────────────────────────

View File

@@ -332,61 +332,67 @@ pub async fn get_movie_profile(
State(state): State<AppState>, State(state): State<AppState>,
Path(movie_id): Path<Uuid>, Path(movie_id): Path<Uuid>,
) -> impl IntoResponse { ) -> impl IntoResponse {
let id = domain::value_objects::MovieId::from_uuid(movie_id); use application::movies::get_movie_profile;
match state.app_ctx.repos.movie_profile.get_by_movie_id(&id).await { let query = get_movie_profile::GetMovieProfileQuery { movie_id };
Ok(Some(p)) => Json(MovieProfileResponse { match get_movie_profile::execute(&state.app_ctx, query).await {
tmdb_id: p.tmdb_id, Ok(Some(result)) => {
imdb_id: p.imdb_id, let p = result.profile;
overview: p.overview, Json(MovieProfileResponse {
tagline: p.tagline, tmdb_id: p.tmdb_id,
runtime_minutes: p.runtime_minutes, imdb_id: p.imdb_id,
budget_usd: p.budget_usd, overview: p.overview,
revenue_usd: p.revenue_usd, tagline: p.tagline,
vote_average: p.vote_average, runtime_minutes: p.runtime_minutes,
vote_count: p.vote_count, budget_usd: p.budget_usd,
original_language: p.original_language, revenue_usd: p.revenue_usd,
collection_name: p.collection_name, vote_average: p.vote_average,
genres: p vote_count: p.vote_count,
.genres original_language: p.original_language,
.into_iter() collection_name: p.collection_name,
.map(|g| GenreDto { genres: p
tmdb_id: g.tmdb_id, .genres
name: g.name, .into_iter()
}) .map(|g| GenreDto {
.collect(), tmdb_id: g.tmdb_id,
keywords: p name: g.name,
.keywords })
.into_iter() .collect(),
.map(|k| KeywordDto { keywords: p
tmdb_id: k.tmdb_id, .keywords
name: k.name, .into_iter()
}) .map(|k| KeywordDto {
.collect(), tmdb_id: k.tmdb_id,
cast: p name: k.name,
.cast })
.into_iter() .collect(),
.map(|c| CastMemberDto { cast: result
tmdb_person_id: c.tmdb_person_id, .cast
name: c.name, .into_iter()
character: c.character, .map(|c| CastMemberDto {
billing_order: c.billing_order, person_id: c.person_id.value().to_string(),
profile_path: c.profile_path, tmdb_person_id: c.tmdb_person_id,
}) name: c.name,
.collect(), character: c.character,
crew: p billing_order: c.billing_order,
.crew profile_path: c.profile_path,
.into_iter() })
.map(|c| CrewMemberDto { .collect(),
tmdb_person_id: c.tmdb_person_id, crew: result
name: c.name, .crew
job: c.job, .into_iter()
department: c.department, .map(|c| CrewMemberDto {
profile_path: c.profile_path, person_id: c.person_id.value().to_string(),
}) tmdb_person_id: c.tmdb_person_id,
.collect(), name: c.name,
enriched_at: p.enriched_at.to_rfc3339(), job: c.job,
}) department: c.department,
.into_response(), profile_path: c.profile_path,
})
.collect(),
enriched_at: p.enriched_at.to_rfc3339(),
})
.into_response()
}
Ok(None) => StatusCode::NOT_FOUND.into_response(), Ok(None) => StatusCode::NOT_FOUND.into_response(),
Err(e) => crate::errors::domain_error_response(e), Err(e) => crate::errors::domain_error_response(e),
} }
@@ -982,6 +988,20 @@ pub async fn get_pending_followers(
} }
} }
pub async fn post_reindex_search(
State(state): State<AppState>,
_admin: crate::extractors::AdminApiUser,
) -> impl IntoResponse {
let event = domain::events::DomainEvent::SearchReindexRequested;
match state.app_ctx.services.event_publisher.publish(&event).await {
Ok(()) => StatusCode::ACCEPTED,
Err(e) => {
tracing::error!("failed to publish reindex event: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
}
}
}
#[utoipa::path( #[utoipa::path(
get, path = "/api/v1/activity-feed", get, path = "/api/v1/activity-feed",
params(ActivityFeedQueryParams), params(ActivityFeedQueryParams),
@@ -1032,6 +1052,8 @@ pub async fn list_users(State(state): State<AppState>) -> Result<Json<UsersRespo
.map(|u| UserSummaryDto { .map(|u| UserSummaryDto {
id: u.user_id.value(), id: u.user_id.value(),
email: u.email().to_string(), email: u.email().to_string(),
username: u.username().to_string(),
display_name: u.display_name().map(String::from),
total_movies: u.total_movies, total_movies: u.total_movies,
avg_rating: u.avg_rating, avg_rating: u.avg_rating,
}) })

View File

@@ -10,5 +10,6 @@ pub fn feed_entry_to_dto(e: &FeedEntry) -> FeedEntryDto {
user_id: e.review().user_id().value(), user_id: e.review().user_id().value(),
user_email: e.user_email().to_string(), user_email: e.user_email().to_string(),
user_display_name: e.user_display_name().to_string(), user_display_name: e.user_display_name().to_string(),
is_federated: e.review().is_remote(),
} }
} }

View File

@@ -4,7 +4,10 @@ use domain::ports::RemoteActorInfo;
use template_askama::{RemoteActorData, RemoteActorDisplay, UserSummaryView}; use template_askama::{RemoteActorData, RemoteActorDisplay, UserSummaryView};
pub fn user_summary_view(u: &UserSummary) -> UserSummaryView { pub fn user_summary_view(u: &UserSummary) -> UserSummaryView {
let name = u.email().split('@').next().unwrap_or("?").to_string(); let name = u
.display_name()
.map(String::from)
.unwrap_or_else(|| u.username().to_string());
let initial = name.chars().next().unwrap_or('?').to_ascii_uppercase(); let initial = name.chars().next().unwrap_or('?').to_ascii_uppercase();
let avg_display = u let avg_display = u
.avg_rating .avg_rating

View File

@@ -432,6 +432,10 @@ fn api_routes(rate_limit: u64) -> Router<AppState> {
.route( .route(
"/wrapups/{id}/video", "/wrapups/{id}/video",
routing::get(handlers::wrapup::get_video), routing::get(handlers::wrapup::get_video),
)
.route(
"/admin/reindex-search",
routing::post(handlers::api::post_reindex_search),
); );
#[cfg(feature = "federation")] #[cfg(feature = "federation")]

View File

@@ -60,6 +60,13 @@ impl domain::ports::PersonQuery for PersonQueryStub {
async fn list_orphaned_persons(&self) -> Result<Vec<domain::models::PersonId>, DomainError> { async fn list_orphaned_persons(&self) -> Result<Vec<domain::models::PersonId>, DomainError> {
Ok(vec![]) Ok(vec![])
} }
async fn list_page(
&self,
_limit: u32,
_offset: u32,
) -> Result<Vec<domain::models::Person>, DomainError> {
Ok(vec![])
}
} }
// --- Search endpoint tests --- // --- Search endpoint tests ---

View File

@@ -431,6 +431,12 @@ impl PersonCommand for Panic {
async fn upsert_batch(&self, _: &[Person]) -> Result<(), DomainError> { async fn upsert_batch(&self, _: &[Person]) -> Result<(), DomainError> {
panic!() panic!()
} }
async fn backfill_from_credits_batch(
&self,
_batch_size: u32,
) -> Result<(u64, bool), DomainError> {
panic!()
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl PersonQuery for Panic { impl PersonQuery for Panic {
@@ -449,6 +455,13 @@ impl PersonQuery for Panic {
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> { async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> {
panic!() panic!()
} }
async fn list_page(
&self,
_limit: u32,
_offset: u32,
) -> Result<Vec<domain::models::Person>, DomainError> {
panic!()
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl SearchPort for Panic { impl SearchPort for Panic {

View File

@@ -297,6 +297,12 @@ impl PersonCommand for PanicPersonCommand {
async fn upsert_batch(&self, _: &[Person]) -> Result<(), DomainError> { async fn upsert_batch(&self, _: &[Person]) -> Result<(), DomainError> {
panic!() panic!()
} }
async fn backfill_from_credits_batch(
&self,
_batch_size: u32,
) -> Result<(u64, bool), DomainError> {
panic!()
}
} }
struct PanicPersonQuery; struct PanicPersonQuery;
@@ -317,6 +323,13 @@ impl PersonQuery for PanicPersonQuery {
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> { async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> {
panic!() panic!()
} }
async fn list_page(
&self,
_limit: u32,
_offset: u32,
) -> Result<Vec<domain::models::Person>, DomainError> {
panic!()
}
} }
struct PanicSearchPort; struct PanicSearchPort;

View File

@@ -6,7 +6,7 @@ use std::sync::Arc;
use anyhow::Context; use anyhow::Context;
use application::{ use application::{
MovieDiscoveryIndexer, SearchCleanupHandler, MovieDiscoveryIndexer, SearchCleanupHandler, SearchReindexHandler,
config::AppConfig, config::AppConfig,
context::{AppContext, Repositories, Services}, context::{AppContext, Repositories, Services},
worker::WorkerService, worker::WorkerService,
@@ -232,12 +232,15 @@ async fn main() -> anyhow::Result<()> {
let wrapup_handler = Arc::new( let wrapup_handler = Arc::new(
application::wrapup::event_handler::WrapUpEventHandler::new(ctx.clone()), application::wrapup::event_handler::WrapUpEventHandler::new(ctx.clone()),
) as Arc<dyn EventHandler>; ) as Arc<dyn EventHandler>;
let reindex_handler =
Arc::new(SearchReindexHandler::new(ctx.clone())) as Arc<dyn EventHandler>;
let mut h = vec![ let mut h = vec![
poster, poster,
cleanup, cleanup,
search_cleanup, search_cleanup,
discovery_indexer, discovery_indexer,
wrapup_handler, wrapup_handler,
reindex_handler,
]; ];
if let Some(e) = enrichment_handler { if let Some(e) = enrichment_handler {
h.push(e); h.push(e);
@@ -282,6 +285,8 @@ async fn main() -> anyhow::Result<()> {
let wrapup_handler = Arc::new( let wrapup_handler = Arc::new(
application::wrapup::event_handler::WrapUpEventHandler::new(ctx.clone()), application::wrapup::event_handler::WrapUpEventHandler::new(ctx.clone()),
) as Arc<dyn EventHandler>; ) as Arc<dyn EventHandler>;
let reindex_handler =
Arc::new(SearchReindexHandler::new(ctx.clone())) as Arc<dyn EventHandler>;
let mut h = vec![ let mut h = vec![
poster, poster,
cleanup, cleanup,
@@ -290,6 +295,7 @@ async fn main() -> anyhow::Result<()> {
search_cleanup, search_cleanup,
discovery_indexer, discovery_indexer,
wrapup_handler, wrapup_handler,
reindex_handler,
]; ];
if let Some(e) = enrichment_handler { if let Some(e) = enrichment_handler {
h.push(e); h.push(e);
@@ -303,10 +309,15 @@ async fn main() -> anyhow::Result<()> {
// ── Run ─────────────────────────────────────────────────────────────────── // ── Run ───────────────────────────────────────────────────────────────────
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
let _ = shutdown_tx.send(true);
});
let worker = WorkerService::new(consumer_arc, handlers); let worker = WorkerService::new(consumer_arc, handlers);
tracing::info!("worker started"); tracing::info!("worker started");
worker.run().await; worker.run(shutdown_rx).await;
tracing::info!("worker stopped");
Ok(()) Ok(())
} }

View File

@@ -1,7 +1,7 @@
import { Link } from "@tanstack/react-router" import { Link } from "@tanstack/react-router"
import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar" import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar"
import { Card, CardContent } from "@/components/ui/card" import { Card, CardContent } from "@/components/ui/card"
import { posterUrl } from "@/lib/api/client" import { tmdbProfileUrl } from "@/lib/api/client"
type PersonRowProps = { type PersonRowProps = {
id: string id: string
@@ -16,7 +16,7 @@ export function PersonRow({ id, name, subtitle, imagePath }: PersonRowProps) {
<Card size="sm"> <Card size="sm">
<CardContent className="flex items-center gap-3"> <CardContent className="flex items-center gap-3">
<Avatar> <Avatar>
{imagePath && <AvatarImage src={posterUrl(imagePath)} />} {imagePath && <AvatarImage src={tmdbProfileUrl(imagePath)} />}
<AvatarFallback>{name[0]?.toUpperCase()}</AvatarFallback> <AvatarFallback>{name[0]?.toUpperCase()}</AvatarFallback>
</Avatar> </Avatar>
<div className="min-w-0 flex-1"> <div className="min-w-0 flex-1">

View File

@@ -1,4 +1,5 @@
import { Link } from "@tanstack/react-router" import { Link } from "@tanstack/react-router"
import { Globe } from "lucide-react"
import { StarDisplay } from "@/components/star-display" import { StarDisplay } from "@/components/star-display"
import { Card, CardContent } from "@/components/ui/card" import { Card, CardContent } from "@/components/ui/card"
import { posterUrl } from "@/lib/api/client" import { posterUrl } from "@/lib/api/client"
@@ -9,9 +10,10 @@ type ReviewCardProps = {
review: ReviewDto review: ReviewDto
userName?: string userName?: string
userId?: string userId?: string
isFederated?: boolean
} }
export function ReviewCard({ movie, review, userName, userId }: ReviewCardProps) { export function ReviewCard({ movie, review, userName, userId, isFederated }: ReviewCardProps) {
return ( return (
<Card size="sm"> <Card size="sm">
<CardContent className="flex gap-3"> <CardContent className="flex gap-3">
@@ -28,6 +30,7 @@ export function ReviewCard({ movie, review, userName, userId }: ReviewCardProps)
) : ( ) : (
<span>{userName}</span> <span>{userName}</span>
)} )}
{isFederated && <Globe className="size-3 text-muted-foreground/60" />}
<span>·</span> <span>·</span>
<span>{review.watched_at.slice(0, 10)}</span> <span>{review.watched_at.slice(0, 10)}</span>
</div> </div>

View File

@@ -74,6 +74,7 @@ export function useLogReview() {
mutationFn: (data: LogReviewRequest) => logReview(data), mutationFn: (data: LogReviewRequest) => logReview(data),
onSuccess: () => { onSuccess: () => {
qc.invalidateQueries({ queryKey: diaryKeys.all }) qc.invalidateQueries({ queryKey: diaryKeys.all })
qc.invalidateQueries({ queryKey: ["activity-feed"] })
}, },
}) })
} }
@@ -84,6 +85,7 @@ export function useDeleteReview() {
mutationFn: (id: string) => deleteReview(id), mutationFn: (id: string) => deleteReview(id),
onSuccess: () => { onSuccess: () => {
qc.invalidateQueries({ queryKey: diaryKeys.all }) qc.invalidateQueries({ queryKey: diaryKeys.all })
qc.invalidateQueries({ queryKey: ["activity-feed"] })
}, },
}) })
} }

View File

@@ -83,7 +83,7 @@ export function useFollow() {
export function useUnfollow() { export function useUnfollow() {
const qc = useQueryClient() const qc = useQueryClient()
return useMutation({ return useMutation({
mutationFn: (data: FollowRequest) => unfollow(data), mutationFn: (data: ActorUrlRequest) => unfollow(data),
onSuccess: () => { onSuccess: () => {
qc.invalidateQueries({ queryKey: socialKeys.following }) qc.invalidateQueries({ queryKey: socialKeys.following })
}, },

View File

@@ -31,6 +31,7 @@ export const feedEntryDtoSchema = z.object({
user_id: z.string().uuid(), user_id: z.string().uuid(),
user_email: z.string(), user_email: z.string(),
user_display_name: z.string(), user_display_name: z.string(),
is_federated: z.boolean(),
}) })
export type FeedEntryDto = z.infer<typeof feedEntryDtoSchema> export type FeedEntryDto = z.infer<typeof feedEntryDtoSchema>

View File

@@ -60,6 +60,7 @@ export const keywordDtoSchema = z.object({
}) })
export const castMemberDtoSchema = z.object({ export const castMemberDtoSchema = z.object({
person_id: z.string(),
tmdb_person_id: z.number(), tmdb_person_id: z.number(),
name: z.string(), name: z.string(),
character: z.string(), character: z.string(),
@@ -69,6 +70,7 @@ export const castMemberDtoSchema = z.object({
export type CastMemberDto = z.infer<typeof castMemberDtoSchema> export type CastMemberDto = z.infer<typeof castMemberDtoSchema>
export const crewMemberDtoSchema = z.object({ export const crewMemberDtoSchema = z.object({
person_id: z.string(),
tmdb_person_id: z.number(), tmdb_person_id: z.number(),
name: z.string(), name: z.string(),
job: z.string(), job: z.string(),

View File

@@ -68,7 +68,7 @@ export function follow(data: FollowRequest) {
return post("/social/follow", data) return post("/social/follow", data)
} }
export function unfollow(data: FollowRequest) { export function unfollow(data: ActorUrlRequest) {
return post("/social/unfollow", data) return post("/social/unfollow", data)
} }

View File

@@ -1,10 +1,12 @@
import { z } from "zod" import { z } from "zod"
import { diaryEntryDtoSchema, paginatedSchema } from "./common" import { diaryEntryDtoSchema, paginatedSchema } from "./common"
import { get, put, putForm } from "./client" import { get, post, put, putForm } from "./client"
export const userSummaryDtoSchema = z.object({ export const userSummaryDtoSchema = z.object({
id: z.string().uuid(), id: z.string().uuid(),
email: z.string(), email: z.string(),
username: z.string(),
display_name: z.string().optional(),
total_movies: z.number(), total_movies: z.number(),
avg_rating: z.number().optional(), avg_rating: z.number().optional(),
}) })
@@ -130,3 +132,7 @@ export function updateProfile(data: UpdateProfileData) {
export function updateProfileFields(data: UpdateProfileFieldsRequest) { export function updateProfileFields(data: UpdateProfileFieldsRequest) {
return put("/profile/fields", data) return put("/profile/fields", data)
} }
export function reindexSearch() {
return post("/admin/reindex-search")
}

View File

@@ -47,6 +47,7 @@ export function deleteWrapUp(id: string) {
} }
export type MovieRef = { export type MovieRef = {
movie_id?: string
title: string title: string
year: number year: number
runtime_minutes?: number runtime_minutes?: number
@@ -54,6 +55,7 @@ export type MovieRef = {
} }
export type PersonStat = { export type PersonStat = {
person_id?: string
name: string name: string
count: number count: number
avg_rating: number avg_rating: number
@@ -91,6 +93,7 @@ export type WrapUpReport = {
first_movie_of_period?: MovieRef first_movie_of_period?: MovieRef
last_movie_of_period?: MovieRef last_movie_of_period?: MovieRef
poster_paths: string[] poster_paths: string[]
top_cast_profile_paths: string[]
} }
export function getWrapUpReport(id: string) { export function getWrapUpReport(id: string) {

View File

@@ -19,6 +19,7 @@
"continue": "Continue", "continue": "Continue",
"generate": "Generate", "generate": "Generate",
"generating": "Generating...", "generating": "Generating...",
"run": "Run",
"reviews": "{{count}} reviews", "reviews": "{{count}} reviews",
"films": "{{count}} films", "films": "{{count}} films",
"filmsAvg": "{{count}} films, avg {{avg}}" "filmsAvg": "{{count}} films, avg {{avg}}"
@@ -154,7 +155,11 @@
"account": "Account", "account": "Account",
"data": "Data", "data": "Data",
"integrations": "Integrations", "integrations": "Integrations",
"socialGroup": "Social" "socialGroup": "Social",
"admin": "Admin",
"rebuildSearch": "Rebuild Search Index",
"rebuildSearchDesc": "Re-index all movies and people",
"rebuildSearchDone": "Reindex queued"
}, },
"editProfile": { "editProfile": {
"title": "Edit Profile", "title": "Edit Profile",
@@ -201,6 +206,9 @@
"generateWrapUp": "Generate Wrap-Up", "generateWrapUp": "Generate Wrap-Up",
"startDate": "Start Date", "startDate": "Start Date",
"endDate": "End Date", "endDate": "End Date",
"generateFor": "Generate for",
"forSelf": "Myself",
"forGlobal": "Everyone (global)",
"heroSubtitle": "Your Year in Movies", "heroSubtitle": "Your Year in Movies",
"moviesWatched": "movies watched", "moviesWatched": "movies watched",
"watchHours": "{{hours}} hours of watch time", "watchHours": "{{hours}} hours of watch time",

View File

@@ -100,6 +100,7 @@ function FeedTab() {
review={entry.review} review={entry.review}
userName={entry.user_display_name} userName={entry.user_display_name}
userId={entry.user_id} userId={entry.user_id}
isFederated={entry.is_federated}
/> />
) )
return entry.user_id === auth?.user_id ? ( return entry.user_id === auth?.user_id ? (

View File

@@ -1,6 +1,6 @@
import { createFileRoute, Link } from "@tanstack/react-router" import { createFileRoute, Link } from "@tanstack/react-router"
import { useTranslation } from "react-i18next" import { useTranslation } from "react-i18next"
import { ArrowLeft, Bookmark, BookmarkCheck, Star, TrendingUp, User, Users } from "lucide-react" import { ArrowLeft, Bookmark, BookmarkCheck, Globe, Star, TrendingUp, User, Users } from "lucide-react"
import { StarDisplay } from "@/components/star-display" import { StarDisplay } from "@/components/star-display"
import { RatingHistogram } from "@/components/rating-histogram" import { RatingHistogram } from "@/components/rating-histogram"
import { EmptyState } from "@/components/empty-state" import { EmptyState } from "@/components/empty-state"
@@ -112,7 +112,10 @@ function MovieDetailPage() {
<CardHeader> <CardHeader>
<div className="flex items-center justify-between"> <div className="flex items-center justify-between">
<div> <div>
<CardTitle className="text-sm">{r.user_display}</CardTitle> <CardTitle className="flex items-center gap-1.5 text-sm">
{r.user_display}
{r.is_federated && <Globe className="size-3 text-muted-foreground/60" />}
</CardTitle>
<CardDescription className="text-[10px]">{r.watched_at.slice(0, 10)}</CardDescription> <CardDescription className="text-[10px]">{r.watched_at.slice(0, 10)}</CardDescription>
</div> </div>
<StarDisplay rating={r.rating} size="xs" /> <StarDisplay rating={r.rating} size="xs" />
@@ -233,7 +236,7 @@ function PersonStrip({ items, type }: { items: (CastMemberDto | CrewMemberDto)[]
: (person as CrewMemberDto).job : (person as CrewMemberDto).job
return ( return (
<div key={`${person.tmdb_person_id}-${i}`} className="w-[72px] flex-shrink-0"> <Link key={`${person.tmdb_person_id}-${i}`} to="/people/$id" params={{ id: person.person_id }} className="w-[72px] flex-shrink-0">
<div className="aspect-[2/3] overflow-hidden rounded-lg bg-muted"> <div className="aspect-[2/3] overflow-hidden rounded-lg bg-muted">
{person.profile_path ? ( {person.profile_path ? (
<img src={tmdbProfileUrl(person.profile_path)} alt="" className="size-full object-cover" loading="lazy" /> <img src={tmdbProfileUrl(person.profile_path)} alt="" className="size-full object-cover" loading="lazy" />
@@ -245,7 +248,7 @@ function PersonStrip({ items, type }: { items: (CastMemberDto | CrewMemberDto)[]
</div> </div>
<p className="mt-1 truncate text-[11px] font-semibold leading-tight">{person.name}</p> <p className="mt-1 truncate text-[11px] font-semibold leading-tight">{person.name}</p>
<p className="truncate text-[10px] italic text-muted-foreground">{subtitle}</p> <p className="truncate text-[10px] italic text-muted-foreground">{subtitle}</p>
</div> </Link>
) )
})} })}
</div> </div>

View File

@@ -52,7 +52,7 @@ function ProfilePage() {
function WrapUpLink() { function WrapUpLink() {
const { t } = useTranslation() const { t } = useTranslation()
const { data } = useWrapUps() const { data } = useWrapUps()
const latest = data?.items?.find((w) => w.status === "completed") const latest = data?.items?.find((w) => w.status === "Ready")
if (!latest) return null if (!latest) return null

View File

@@ -1,5 +1,6 @@
import { createFileRoute, Link, useNavigate } from "@tanstack/react-router" import { createFileRoute, Link, useNavigate } from "@tanstack/react-router"
import { useTranslation } from "react-i18next" import { useTranslation } from "react-i18next"
import { useMutation } from "@tanstack/react-query"
import { import {
ArrowLeft, ArrowLeft,
ChevronRight, ChevronRight,
@@ -7,11 +8,14 @@ import {
Globe, Globe,
Key, Key,
LogOut, LogOut,
RefreshCw,
ShieldBan, ShieldBan,
Sparkles, Sparkles,
User, User,
} from "lucide-react" } from "lucide-react"
import { Button } from "@/components/ui/button"
import { useAuth, useIsAdmin } from "@/components/auth-provider" import { useAuth, useIsAdmin } from "@/components/auth-provider"
import { reindexSearch } from "@/lib/api/users"
export const Route = createFileRoute("/_app/settings/")({ export const Route = createFileRoute("/_app/settings/")({
component: SettingsPage, component: SettingsPage,
@@ -100,6 +104,8 @@ function SettingsPage() {
<SettingsGroup label={t("settings.integrations")} items={integrations} /> <SettingsGroup label={t("settings.integrations")} items={integrations} />
<SettingsGroup label={t("settings.socialGroup")} items={social} /> <SettingsGroup label={t("settings.socialGroup")} items={social} />
{isAdmin && <AdminActions />}
<button <button
onClick={handleLogout} onClick={handleLogout}
className="w-full rounded-xl bg-card p-3 text-sm font-medium text-red-400" className="w-full rounded-xl bg-card p-3 text-sm font-medium text-red-400"
@@ -113,6 +119,37 @@ function SettingsPage() {
) )
} }
function AdminActions() {
const { t } = useTranslation()
const reindex = useMutation({
mutationFn: reindexSearch,
})
return (
<div>
<p className="mb-1.5 px-1 text-xs font-medium text-muted-foreground">
{t("settings.admin")}
</p>
<div className="divide-y divide-border rounded-xl bg-card">
<div className="flex items-center gap-3 p-3">
<span className="text-muted-foreground">
<RefreshCw className={`size-4 ${reindex.isPending ? "animate-spin" : ""}`} />
</span>
<div className="flex-1">
<p className="text-sm font-medium">{t("settings.rebuildSearch")}</p>
<p className="text-xs text-muted-foreground">
{reindex.isSuccess ? t("settings.rebuildSearchDone") : t("settings.rebuildSearchDesc")}
</p>
</div>
<Button variant="outline" size="sm" onClick={() => reindex.mutate()} disabled={reindex.isPending}>
{reindex.isPending ? t("common.generating") : t("common.run")}
</Button>
</div>
</div>
</div>
)
}
function SettingsGroup({ function SettingsGroup({
label, label,
items, items,

View File

@@ -15,12 +15,14 @@ import { Input } from "@/components/ui/input"
import { Label } from "@/components/ui/label" import { Label } from "@/components/ui/label"
import { Skeleton } from "@/components/ui/skeleton" import { Skeleton } from "@/components/ui/skeleton"
import { EmptyState } from "@/components/empty-state" import { EmptyState } from "@/components/empty-state"
import { useIsAdmin } from "@/components/auth-provider" import { useAuth, useIsAdmin } from "@/components/auth-provider"
import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "@/components/ui/select"
import { import {
useWrapUps, useWrapUps,
useGenerateWrapUp, useGenerateWrapUp,
useDeleteWrapUp, useDeleteWrapUp,
} from "@/hooks/use-wrapup" } from "@/hooks/use-wrapup"
import { useUsers } from "@/hooks/use-users"
export const Route = createFileRoute("/_app/settings/wrapup")({ export const Route = createFileRoute("/_app/settings/wrapup")({
component: WrapupPage, component: WrapupPage,
@@ -28,18 +30,22 @@ export const Route = createFileRoute("/_app/settings/wrapup")({
function WrapupPage() { function WrapupPage() {
const { t } = useTranslation() const { t } = useTranslation()
const { auth } = useAuth()
const isAdmin = useIsAdmin() const isAdmin = useIsAdmin()
const { data, isPending } = useWrapUps() const { data, isPending } = useWrapUps()
const generate = useGenerateWrapUp() const generate = useGenerateWrapUp()
const remove = useDeleteWrapUp() const remove = useDeleteWrapUp()
const { data: usersData } = useUsers()
const [open, setOpen] = useState(false) const [open, setOpen] = useState(false)
const [startDate, setStartDate] = useState("") const [startDate, setStartDate] = useState("")
const [endDate, setEndDate] = useState("") const [endDate, setEndDate] = useState("")
const [targetUserId, setTargetUserId] = useState<string>("self")
const handleGenerate = () => { const handleGenerate = () => {
const user_id = targetUserId === "global" ? undefined : targetUserId === "self" ? auth?.user_id : targetUserId
generate.mutate( generate.mutate(
{ start_date: startDate, end_date: endDate }, { start_date: startDate, end_date: endDate, user_id },
{ {
onSuccess: () => { onSuccess: () => {
setOpen(false) setOpen(false)
@@ -81,7 +87,7 @@ function WrapupPage() {
{items.map((w) => ( {items.map((w) => (
<Card key={w.id} size="sm"> <Card key={w.id} size="sm">
<CardContent className="flex items-center justify-between"> <CardContent className="flex items-center justify-between">
{w.status === "completed" ? ( {w.status === "Ready" ? (
<Link to="/wrapup/$id" params={{ id: w.id }} className="flex flex-1 items-center justify-between"> <Link to="/wrapup/$id" params={{ id: w.id }} className="flex flex-1 items-center justify-between">
<div> <div>
<p className="text-sm font-medium">{w.start_date} {w.end_date}</p> <p className="text-sm font-medium">{w.start_date} {w.end_date}</p>
@@ -133,6 +139,25 @@ function WrapupPage() {
onChange={(e) => setEndDate(e.target.value)} onChange={(e) => setEndDate(e.target.value)}
/> />
</div> </div>
{isAdmin && usersData?.users && (
<div className="space-y-1.5">
<Label>{t("wrapup.generateFor")}</Label>
<Select value={targetUserId} onValueChange={setTargetUserId}>
<SelectTrigger>
<SelectValue />
</SelectTrigger>
<SelectContent>
<SelectItem value="self">{t("wrapup.forSelf")}</SelectItem>
<SelectItem value="global">{t("wrapup.forGlobal")}</SelectItem>
{usersData.users.map((u) => (
<SelectItem key={u.id} value={u.id}>
{u.display_name ?? u.username ?? u.email}
</SelectItem>
))}
</SelectContent>
</Select>
</div>
)}
<Button <Button
onClick={handleGenerate} onClick={handleGenerate}
disabled={generate.isPending || !startDate || !endDate} disabled={generate.isPending || !startDate || !endDate}

View File

@@ -105,7 +105,7 @@ function OwnFollowingTab() {
<Button <Button
variant="outline" variant="outline"
size="sm" size="sm"
onClick={() => unfollowMutation.mutate({ handle: actor.handle })} onClick={() => unfollowMutation.mutate({ actor_url: actor.url })}
disabled={unfollowMutation.isPending} disabled={unfollowMutation.isPending}
> >
<UserMinus className="mr-1 size-3.5" /> <UserMinus className="mr-1 size-3.5" />
@@ -217,6 +217,15 @@ function UserFollowersTab({ userId }: { userId: string }) {
) )
} }
function actorHandle(actor: RemoteActorDto): string {
try {
const host = new URL(actor.url).host
return `@${actor.handle}@${host}`
} catch {
return `@${actor.handle}`
}
}
function ActorCard({ actor, action }: { actor: RemoteActorDto; action?: React.ReactNode }) { function ActorCard({ actor, action }: { actor: RemoteActorDto; action?: React.ReactNode }) {
const initial = (actor.display_name || actor.handle)[0]?.toUpperCase() ?? "?" const initial = (actor.display_name || actor.handle)[0]?.toUpperCase() ?? "?"
@@ -228,7 +237,7 @@ function ActorCard({ actor, action }: { actor: RemoteActorDto; action?: React.Re
</Avatar> </Avatar>
<div className="min-w-0 flex-1"> <div className="min-w-0 flex-1">
<p className="truncate text-sm font-semibold">{actor.display_name || actor.handle}</p> <p className="truncate text-sm font-semibold">{actor.display_name || actor.handle}</p>
<p className="truncate text-xs text-muted-foreground">{actor.handle}</p> <p className="truncate text-xs text-muted-foreground">{actorHandle(actor)}</p>
</div> </div>
{action} {action}
</CardContent> </CardContent>

View File

@@ -41,7 +41,7 @@ function UserProfilePage() {
<Button <Button
size="sm" size="sm"
variant="outline" variant="outline"
onClick={() => unfollowMutation.mutate({ handle: data.username })} onClick={() => unfollowMutation.mutate({ actor_url: followingData?.actors.find((a) => a.handle === data.username)?.url ?? "" })}
disabled={unfollowMutation.isPending} disabled={unfollowMutation.isPending}
> >
<UserCheck className="mr-1 size-3.5" /> <UserCheck className="mr-1 size-3.5" />

View File

@@ -5,7 +5,8 @@ import { Badge } from "@/components/ui/badge"
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card" import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"
import { Skeleton } from "@/components/ui/skeleton" import { Skeleton } from "@/components/ui/skeleton"
import { RatingHistogram } from "@/components/rating-histogram" import { RatingHistogram } from "@/components/rating-histogram"
import { posterUrl } from "@/lib/api/client" import { posterUrl, tmdbProfileUrl } from "@/lib/api/client"
import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar"
import { useWrapUpReport } from "@/hooks/use-wrapup" import { useWrapUpReport } from "@/hooks/use-wrapup"
import type { MovieRef, PersonStat } from "@/lib/api/wrapup" import type { MovieRef, PersonStat } from "@/lib/api/wrapup"
@@ -82,6 +83,7 @@ function WrapUpReportPage() {
title={t("wrapup.topActors")} title={t("wrapup.topActors")}
subtitle={t("wrapup.uniqueActors", { count: report.actor_diversity })} subtitle={t("wrapup.uniqueActors", { count: report.actor_diversity })}
items={report.top_actors.slice(0, 5)} items={report.top_actors.slice(0, 5)}
profilePaths={report.top_cast_profile_paths}
/> />
)} )}
@@ -175,7 +177,7 @@ function WrapUpReportPage() {
) )
} }
function RankCard({ title, subtitle, items }: { title: string; subtitle: string; items: PersonStat[] }) { function RankCard({ title, subtitle, items, profilePaths }: { title: string; subtitle: string; items: PersonStat[]; profilePaths?: string[] }) {
const { t } = useTranslation() const { t } = useTranslation()
return ( return (
<Card> <Card>
@@ -187,15 +189,38 @@ function RankCard({ title, subtitle, items }: { title: string; subtitle: string;
</CardHeader> </CardHeader>
<CardContent> <CardContent>
<ol className="space-y-2"> <ol className="space-y-2">
{items.map((item, i) => ( {items.map((item, i) => {
<li key={item.name} className="flex items-center gap-3"> const profilePath = profilePaths?.[i]
<span className="flex size-6 items-center justify-center rounded-full bg-muted text-xs font-bold">{i + 1}</span> return (
<div className="flex-1"> <li key={item.name}>
<p className="text-sm font-medium">{item.name}</p> {item.person_id ? (
<p className="text-xs text-muted-foreground">{t("common.filmsAvg", { count: item.count, avg: item.avg_rating.toFixed(1) })}</p> <Link to="/people/$id" params={{ id: item.person_id }} className="flex items-center gap-3">
</div> <span className="flex size-6 items-center justify-center rounded-full bg-muted text-xs font-bold">{i + 1}</span>
</li> <Avatar className="size-8">
))} {profilePath && <AvatarImage src={tmdbProfileUrl(profilePath)} />}
<AvatarFallback className="text-xs">{item.name[0]}</AvatarFallback>
</Avatar>
<div className="flex-1">
<p className="text-sm font-medium">{item.name}</p>
<p className="text-xs text-muted-foreground">{t("common.filmsAvg", { count: item.count, avg: item.avg_rating.toFixed(1) })}</p>
</div>
</Link>
) : (
<div className="flex items-center gap-3">
<span className="flex size-6 items-center justify-center rounded-full bg-muted text-xs font-bold">{i + 1}</span>
<Avatar className="size-8">
{profilePath && <AvatarImage src={tmdbProfileUrl(profilePath)} />}
<AvatarFallback className="text-xs">{item.name[0]}</AvatarFallback>
</Avatar>
<div className="flex-1">
<p className="text-sm font-medium">{item.name}</p>
<p className="text-xs text-muted-foreground">{t("common.filmsAvg", { count: item.count, avg: item.avg_rating.toFixed(1) })}</p>
</div>
</div>
)}
</li>
)
})}
</ol> </ol>
</CardContent> </CardContent>
</Card> </Card>
@@ -204,7 +229,7 @@ function RankCard({ title, subtitle, items }: { title: string; subtitle: string;
function MovieHighlight({ label, movie, showRuntime }: { label: string; movie?: MovieRef; showRuntime?: boolean }) { function MovieHighlight({ label, movie, showRuntime }: { label: string; movie?: MovieRef; showRuntime?: boolean }) {
if (!movie) return null if (!movie) return null
return ( const content = (
<div className="overflow-hidden rounded-xl bg-muted"> <div className="overflow-hidden rounded-xl bg-muted">
{movie.poster_path && ( {movie.poster_path && (
<div className="aspect-[2/3] w-full"> <div className="aspect-[2/3] w-full">
@@ -220,6 +245,10 @@ function MovieHighlight({ label, movie, showRuntime }: { label: string; movie?:
</div> </div>
</div> </div>
) )
if (movie.movie_id) {
return <Link to="/movies/$id" params={{ id: movie.movie_id }}>{content}</Link>
}
return content
} }
function ReportSkeleton() { function ReportSkeleton() {