diff --git a/crates/adapters/activitypub/src/event_handler.rs b/crates/adapters/activitypub/src/event_handler.rs index 147eafd..a9dfeb5 100644 --- a/crates/adapters/activitypub/src/event_handler.rs +++ b/crates/adapters/activitypub/src/event_handler.rs @@ -4,7 +4,7 @@ use domain::{ errors::DomainError, events::DomainEvent, ports::LocalApContentQuery, - value_objects::{ReviewId, UserId}, + value_objects::{MovieId, ReviewId, UserId}, }; use std::sync::Arc; @@ -97,6 +97,10 @@ impl EventHandler for ActivityPubEventHandler { .await .map_err(|e| DomainError::InfrastructureError(e.to_string())) } + DomainEvent::PosterSynced { movie_id } => self + .on_poster_synced(movie_id) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string())), _ => Ok(()), } } @@ -267,4 +271,44 @@ impl ActivityPubEventHandler { .await?; Ok(()) } + + async fn on_poster_synced(&self, movie_id: &MovieId) -> anyhow::Result<()> { + let entries = self + .content_query + .get_local_reviews_for_movie(movie_id) + .await?; + + let movie = self.content_query.get_movie_by_id(movie_id).await?; + let movie = match movie { + Some(m) => m, + None => return Ok(()), + }; + let poster_url = movie + .poster_path() + .map(|p| format!("{}/images/{}", self.base_url, p.value())); + + for entry in entries { + let review = entry.review(); + let user_id = review.user_id(); + let ap_id = review_url(&self.base_url, review.id()); + let actor = actor_url(&self.base_url, user_id.value()); + + let obj = review_to_ap_object( + review, + ap_id, + actor, + movie.title().value().to_string(), + movie.release_year().value(), + poster_url.clone(), + &self.base_url, + ); + let json = serde_json::to_value(obj)?; + + self.ap_service + .broadcast_update_note(user_id.value(), json, ApVisibility::Public, vec![]) + .await?; + } + + Ok(()) + } } diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs index 8d394c2..276e34b 100644 --- a/crates/adapters/event-payload/src/lib.rs +++ b/crates/adapters/event-payload/src/lib.rs @@ -87,6 +87,9 @@ pub enum EventPayload { wrapup_id: String, }, SearchReindexRequested, + PosterSynced { + movie_id: String, + }, } impl EventPayload { @@ -109,6 +112,7 @@ impl EventPayload { EventPayload::WrapUpRequested { .. } => "WrapUpRequested", EventPayload::WrapUpCompleted { .. } => "WrapUpCompleted", EventPayload::SearchReindexRequested => "SearchReindexRequested", + EventPayload::PosterSynced { .. } => "PosterSynced", } } } @@ -251,6 +255,9 @@ impl From<&DomainEvent> for EventPayload { wrapup_id: wrapup_id.value().to_string(), }, DomainEvent::SearchReindexRequested => EventPayload::SearchReindexRequested, + DomainEvent::PosterSynced { movie_id } => EventPayload::PosterSynced { + movie_id: movie_id.value().to_string(), + }, } } } @@ -402,6 +409,9 @@ impl TryFrom for DomainEvent { }) } EventPayload::SearchReindexRequested => Ok(DomainEvent::SearchReindexRequested), + EventPayload::PosterSynced { movie_id } => Ok(DomainEvent::PosterSynced { + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + }), } } } diff --git a/crates/adapters/nats/src/subject.rs b/crates/adapters/nats/src/subject.rs index 1f1f127..dad44ca 100644 --- a/crates/adapters/nats/src/subject.rs +++ b/crates/adapters/nats/src/subject.rs @@ -19,6 +19,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String { DomainEvent::WrapUpRequested { .. } => "wrapup.requested", DomainEvent::WrapUpCompleted { .. } => "wrapup.completed", DomainEvent::SearchReindexRequested => "search.reindex.requested", + DomainEvent::PosterSynced { .. } => "poster.synced", }; format!("{prefix}.{suffix}") } diff --git a/crates/adapters/poster-sync/src/lib.rs b/crates/adapters/poster-sync/src/lib.rs index aea66c7..442cab3 100644 --- a/crates/adapters/poster-sync/src/lib.rs +++ b/crates/adapters/poster-sync/src/lib.rs @@ -82,7 +82,22 @@ impl PosterSyncHandler { let poster_path = PosterPath::new(stored_path)?; movie.update_poster(poster_path); - self.movie_repository.upsert_movie(&movie).await + self.movie_repository.upsert_movie(&movie).await?; + + if let Err(e) = self + .event_publisher + .publish(&DomainEvent::PosterSynced { + movie_id: movie.id().clone(), + }) + .await + { + tracing::warn!( + "failed to emit PosterSynced for {}: {e}", + movie.id().value() + ); + } + + Ok(()) } } diff --git a/crates/adapters/postgres/src/ap_content.rs b/crates/adapters/postgres/src/ap_content.rs index f55ef9a..04b1b39 100644 --- a/crates/adapters/postgres/src/ap_content.rs +++ b/crates/adapters/postgres/src/ap_content.rs @@ -115,6 +115,29 @@ impl LocalApContentQuery for PostgresApContentQuery { .collect() } + async fn get_local_reviews_for_movie( + &self, + movie_id: &MovieId, + ) -> Result, DomainError> { + let mid = movie_id.value().to_string(); + let rows = sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.movie_id = $1 AND r.remote_actor_url IS NULL + ORDER BY r.created_at DESC", + ) + .bind(&mid) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + rows.into_iter().map(DiaryRow::into_domain).collect() + } + async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError> { let id = review_id.value().to_string(); sqlx::query_as::<_, ReviewRow>( diff --git a/crates/adapters/sqlite/src/ap_content.rs b/crates/adapters/sqlite/src/ap_content.rs index 5b507e7..92dce80 100644 --- a/crates/adapters/sqlite/src/ap_content.rs +++ b/crates/adapters/sqlite/src/ap_content.rs @@ -67,6 +67,26 @@ impl LocalApContentQuery for SqliteApContentQuery { rows.into_iter().map(WatchlistRow::into_domain).collect() } + async fn get_local_reviews_for_movie( + &self, + movie_id: &MovieId, + ) -> Result, DomainError> { + let mid = movie_id.value().to_string(); + let rows = sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.movie_id = ? AND r.remote_actor_url IS NULL + ORDER BY r.created_at DESC", + ) + .bind(&mid) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + rows.into_iter().map(DiaryRow::into_domain).collect() + } + async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError> { let id = review_id.value().to_string(); sqlx::query_as::<_, ReviewRow>( diff --git a/crates/application/src/tests/worker.rs b/crates/application/src/tests/worker.rs index 65f8300..543576c 100644 --- a/crates/application/src/tests/worker.rs +++ b/crates/application/src/tests/worker.rs @@ -62,6 +62,7 @@ impl EventHandler for RecordingHandler { DomainEvent::WrapUpRequested { .. } => "wrapup_requested", DomainEvent::WrapUpCompleted { .. } => "wrapup_completed", DomainEvent::SearchReindexRequested => "search_reindex", + DomainEvent::PosterSynced { .. } => "poster_synced", }; self.calls.lock().unwrap().push(label); Ok(()) diff --git a/crates/domain/src/events.rs b/crates/domain/src/events.rs index 9599782..bd7ac82 100644 --- a/crates/domain/src/events.rs +++ b/crates/domain/src/events.rs @@ -85,6 +85,9 @@ pub enum DomainEvent { wrapup_id: WrapUpId, }, SearchReindexRequested, + PosterSynced { + movie_id: MovieId, + }, } #[async_trait] diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index b5db5ba..afc39e7 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -431,6 +431,11 @@ pub trait LocalApContentQuery: Send + Sync { async fn count_local_posts(&self) -> Result; + async fn get_local_reviews_for_movie( + &self, + movie_id: &MovieId, + ) -> Result, DomainError>; + async fn get_local_reviews_page( &self, user_id: &UserId,