From 886f26c7dc67fe8070f23cb151141ce6a0ba5bfc Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 4 Jun 2026 23:12:27 +0200 Subject: [PATCH] fix: broadcast AP Update after poster sync to fix missing posters on remote instances New movies had no poster at AP broadcast time (race between poster sync and ReviewLogged handler). PosterSynced event now fires after sync completes, triggering Update notes so remote apps get the poster URL. --- .../adapters/activitypub/src/event_handler.rs | 46 ++++++++++++++++++- crates/adapters/event-payload/src/lib.rs | 10 ++++ crates/adapters/nats/src/subject.rs | 1 + crates/adapters/poster-sync/src/lib.rs | 17 ++++++- crates/adapters/postgres/src/ap_content.rs | 23 ++++++++++ crates/adapters/sqlite/src/ap_content.rs | 20 ++++++++ crates/application/src/tests/worker.rs | 1 + crates/domain/src/events.rs | 3 ++ crates/domain/src/ports.rs | 5 ++ 9 files changed, 124 insertions(+), 2 deletions(-) diff --git a/crates/adapters/activitypub/src/event_handler.rs b/crates/adapters/activitypub/src/event_handler.rs index 147eafd..a9dfeb5 100644 --- a/crates/adapters/activitypub/src/event_handler.rs +++ b/crates/adapters/activitypub/src/event_handler.rs @@ -4,7 +4,7 @@ use domain::{ errors::DomainError, events::DomainEvent, ports::LocalApContentQuery, - value_objects::{ReviewId, UserId}, + value_objects::{MovieId, ReviewId, UserId}, }; use std::sync::Arc; @@ -97,6 +97,10 @@ impl EventHandler for ActivityPubEventHandler { .await .map_err(|e| DomainError::InfrastructureError(e.to_string())) } + DomainEvent::PosterSynced { movie_id } => self + .on_poster_synced(movie_id) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string())), _ => Ok(()), } } @@ -267,4 +271,44 @@ impl ActivityPubEventHandler { .await?; Ok(()) } + + async fn on_poster_synced(&self, movie_id: &MovieId) -> anyhow::Result<()> { + let entries = self + .content_query + .get_local_reviews_for_movie(movie_id) + .await?; + + let movie = self.content_query.get_movie_by_id(movie_id).await?; + let movie = match movie { + Some(m) => m, + None => return Ok(()), + }; + let poster_url = movie + .poster_path() + .map(|p| format!("{}/images/{}", self.base_url, p.value())); + + for entry in entries { + let review = entry.review(); + let user_id = review.user_id(); + let ap_id = review_url(&self.base_url, review.id()); + let actor = actor_url(&self.base_url, user_id.value()); + + let obj = review_to_ap_object( + review, + ap_id, + actor, + movie.title().value().to_string(), + movie.release_year().value(), + poster_url.clone(), + &self.base_url, + ); + let json = serde_json::to_value(obj)?; + + self.ap_service + .broadcast_update_note(user_id.value(), json, ApVisibility::Public, vec![]) + .await?; + } + + Ok(()) + } } diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs index 8d394c2..276e34b 100644 --- a/crates/adapters/event-payload/src/lib.rs +++ b/crates/adapters/event-payload/src/lib.rs @@ -87,6 +87,9 @@ pub enum EventPayload { wrapup_id: String, }, SearchReindexRequested, + PosterSynced { + movie_id: String, + }, } impl EventPayload { @@ -109,6 +112,7 @@ impl EventPayload { EventPayload::WrapUpRequested { .. } => "WrapUpRequested", EventPayload::WrapUpCompleted { .. } => "WrapUpCompleted", EventPayload::SearchReindexRequested => "SearchReindexRequested", + EventPayload::PosterSynced { .. } => "PosterSynced", } } } @@ -251,6 +255,9 @@ impl From<&DomainEvent> for EventPayload { wrapup_id: wrapup_id.value().to_string(), }, DomainEvent::SearchReindexRequested => EventPayload::SearchReindexRequested, + DomainEvent::PosterSynced { movie_id } => EventPayload::PosterSynced { + movie_id: movie_id.value().to_string(), + }, } } } @@ -402,6 +409,9 @@ impl TryFrom for DomainEvent { }) } EventPayload::SearchReindexRequested => Ok(DomainEvent::SearchReindexRequested), + EventPayload::PosterSynced { movie_id } => Ok(DomainEvent::PosterSynced { + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + }), } } } diff --git a/crates/adapters/nats/src/subject.rs b/crates/adapters/nats/src/subject.rs index 1f1f127..dad44ca 100644 --- a/crates/adapters/nats/src/subject.rs +++ b/crates/adapters/nats/src/subject.rs @@ -19,6 +19,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String { DomainEvent::WrapUpRequested { .. } => "wrapup.requested", DomainEvent::WrapUpCompleted { .. } => "wrapup.completed", DomainEvent::SearchReindexRequested => "search.reindex.requested", + DomainEvent::PosterSynced { .. } => "poster.synced", }; format!("{prefix}.{suffix}") } diff --git a/crates/adapters/poster-sync/src/lib.rs b/crates/adapters/poster-sync/src/lib.rs index aea66c7..442cab3 100644 --- a/crates/adapters/poster-sync/src/lib.rs +++ b/crates/adapters/poster-sync/src/lib.rs @@ -82,7 +82,22 @@ impl PosterSyncHandler { let poster_path = PosterPath::new(stored_path)?; movie.update_poster(poster_path); - self.movie_repository.upsert_movie(&movie).await + self.movie_repository.upsert_movie(&movie).await?; + + if let Err(e) = self + .event_publisher + .publish(&DomainEvent::PosterSynced { + movie_id: movie.id().clone(), + }) + .await + { + tracing::warn!( + "failed to emit PosterSynced for {}: {e}", + movie.id().value() + ); + } + + Ok(()) } } diff --git a/crates/adapters/postgres/src/ap_content.rs b/crates/adapters/postgres/src/ap_content.rs index f55ef9a..04b1b39 100644 --- a/crates/adapters/postgres/src/ap_content.rs +++ b/crates/adapters/postgres/src/ap_content.rs @@ -115,6 +115,29 @@ impl LocalApContentQuery for PostgresApContentQuery { .collect() } + async fn get_local_reviews_for_movie( + &self, + movie_id: &MovieId, + ) -> Result, DomainError> { + let mid = movie_id.value().to_string(); + let rows = sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.movie_id = $1 AND r.remote_actor_url IS NULL + ORDER BY r.created_at DESC", + ) + .bind(&mid) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + rows.into_iter().map(DiaryRow::into_domain).collect() + } + async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError> { let id = review_id.value().to_string(); sqlx::query_as::<_, ReviewRow>( diff --git a/crates/adapters/sqlite/src/ap_content.rs b/crates/adapters/sqlite/src/ap_content.rs index 5b507e7..92dce80 100644 --- a/crates/adapters/sqlite/src/ap_content.rs +++ b/crates/adapters/sqlite/src/ap_content.rs @@ -67,6 +67,26 @@ impl LocalApContentQuery for SqliteApContentQuery { rows.into_iter().map(WatchlistRow::into_domain).collect() } + async fn get_local_reviews_for_movie( + &self, + movie_id: &MovieId, + ) -> Result, DomainError> { + let mid = movie_id.value().to_string(); + let rows = sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.movie_id = ? AND r.remote_actor_url IS NULL + ORDER BY r.created_at DESC", + ) + .bind(&mid) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + rows.into_iter().map(DiaryRow::into_domain).collect() + } + async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError> { let id = review_id.value().to_string(); sqlx::query_as::<_, ReviewRow>( diff --git a/crates/application/src/tests/worker.rs b/crates/application/src/tests/worker.rs index 65f8300..543576c 100644 --- a/crates/application/src/tests/worker.rs +++ b/crates/application/src/tests/worker.rs @@ -62,6 +62,7 @@ impl EventHandler for RecordingHandler { DomainEvent::WrapUpRequested { .. } => "wrapup_requested", DomainEvent::WrapUpCompleted { .. } => "wrapup_completed", DomainEvent::SearchReindexRequested => "search_reindex", + DomainEvent::PosterSynced { .. } => "poster_synced", }; self.calls.lock().unwrap().push(label); Ok(()) diff --git a/crates/domain/src/events.rs b/crates/domain/src/events.rs index 9599782..bd7ac82 100644 --- a/crates/domain/src/events.rs +++ b/crates/domain/src/events.rs @@ -85,6 +85,9 @@ pub enum DomainEvent { wrapup_id: WrapUpId, }, SearchReindexRequested, + PosterSynced { + movie_id: MovieId, + }, } #[async_trait] diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index b5db5ba..afc39e7 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -431,6 +431,11 @@ pub trait LocalApContentQuery: Send + Sync { async fn count_local_posts(&self) -> Result; + async fn get_local_reviews_for_movie( + &self, + movie_id: &MovieId, + ) -> Result, DomainError>; + async fn get_local_reviews_page( &self, user_id: &UserId,