diff --git a/crates/adapters/activitypub/src/handler.rs b/crates/adapters/activitypub/src/handler.rs index 242adda..63b85de 100644 --- a/crates/adapters/activitypub/src/handler.rs +++ b/crates/adapters/activitypub/src/handler.rs @@ -7,19 +7,25 @@ use url::Url; use crate::note::ThoughtNote; use crate::urls::ThoughtsUrls; use activitypub_base::ApObjectHandler; -use domain::ports::ActivityPubRepository; +use domain::ports::{ActivityPubRepository, EventPublisher}; use domain::value_objects::UserId; pub struct ThoughtsObjectHandler { repo: Arc, urls: ThoughtsUrls, + event_publisher: Option>, } impl ThoughtsObjectHandler { - pub fn new(repo: Arc, base_url: &str) -> Self { + pub fn new( + repo: Arc, + base_url: &str, + event_publisher: Option>, + ) -> Self { Self { repo, urls: ThoughtsUrls::new(base_url), + event_publisher, } } } @@ -170,11 +176,85 @@ impl ApObjectHandler for ThoughtsObjectHandler { .map_err(|e| anyhow!("{e}")) } - async fn on_like(&self, _object_url: &Url, _actor_url: &Url) -> Result<()> { + async fn on_like(&self, object_url: &Url, actor_url: &Url) -> Result<()> { + let thought_uuid = object_url + .path() + .strip_prefix("/thoughts/") + .and_then(|s| s.split('/').next()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + + let thought_uuid = match thought_uuid { + Some(u) => u, + None => { + tracing::debug!(object = %object_url, "on_like: not a local thought URL, skipping"); + return Ok(()); + } + }; + + let actor_user_id = self + .repo + .find_remote_actor_id(actor_url) + .await + .map_err(|e| anyhow!("{e}"))?; + + let actor_user_id = match actor_user_id { + Some(id) => id, + None => { + tracing::debug!(actor = %actor_url, "on_like: remote actor not interned, skipping notification"); + return Ok(()); + } + }; + + if let Some(ep) = &self.event_publisher { + let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid); + let like_id = domain::value_objects::LikeId::new(); + ep.publish(&domain::events::DomainEvent::LikeAdded { + like_id, + user_id: actor_user_id, + thought_id, + }) + .await + .map_err(|e| anyhow!("{e}"))?; + } + Ok(()) } - async fn on_announce_received(&self, _object_url: &Url, _actor_url: &Url) -> Result<()> { + async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> Result<()> { + let thought_uuid = object_url + .path() + .strip_prefix("/thoughts/") + .and_then(|s| s.split('/').next()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + + let thought_uuid = match thought_uuid { + Some(u) => u, + None => return Ok(()), + }; + + let actor_user_id = self + .repo + .find_remote_actor_id(actor_url) + .await + .map_err(|e| anyhow!("{e}"))?; + + let actor_user_id = match actor_user_id { + Some(id) => id, + None => return Ok(()), + }; + + if let Some(ep) = &self.event_publisher { + let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid); + let boost_id = domain::value_objects::BoostId::new(); + ep.publish(&domain::events::DomainEvent::BoostAdded { + boost_id, + user_id: actor_user_id, + thought_id, + }) + .await + .map_err(|e| anyhow!("{e}"))?; + } + Ok(()) } diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index 335b8aa..e5fd0b9 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -72,6 +72,7 @@ pub async fn build(cfg: &Config) -> Infrastructure { Arc::new(ThoughtsObjectHandler::new( Arc::new(PgActivityPubRepository::new(pool.clone())), &cfg.base_url, + Some(event_publisher.clone()), )), cfg.base_url.clone(), cfg.allow_registration, diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs index a2bf1f7..91fb5ad 100644 --- a/crates/worker/src/factory.rs +++ b/crates/worker/src/factory.rs @@ -46,6 +46,7 @@ pub async fn build( Arc::new(ThoughtsObjectHandler::new( Arc::new(PgActivityPubRepository::new(pool.clone())), base_url, + None, )), base_url.to_string(), false,