use anyhow::{anyhow, Result}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use std::sync::Arc; use url::Url; use crate::note::ThoughtNote; use crate::urls::ThoughtsUrls; use activitypub_base::ApObjectHandler; use domain::ports::{ActivityPubRepository, EventPublisher}; use domain::value_objects::UserId; pub struct ThoughtsObjectHandler { repo: Arc, urls: ThoughtsUrls, event_publisher: Option>, } impl ThoughtsObjectHandler { pub fn new( repo: Arc, base_url: &str, event_publisher: Option>, ) -> Self { Self { repo, urls: ThoughtsUrls::new(base_url), event_publisher, } } } #[async_trait] impl ApObjectHandler for ThoughtsObjectHandler { async fn get_local_objects_for_user( &self, user_id: uuid::Uuid, ) -> Result> { let uid = UserId::from_uuid(user_id); let entries = self .repo .outbox_entries_for_actor(&uid) .await .map_err(|e| anyhow!("{e}"))?; entries .into_iter() .map(|e| { let note_url = self.urls.thought_url(e.thought.id.as_uuid()); let actor_url = self.urls.user_url(e.author_username.as_str()); let followers = self.urls.user_followers(e.author_username.as_str()); let in_reply_to = e .thought .in_reply_to_id .map(|id| self.urls.thought_url(id.as_uuid())); let note = ThoughtNote::new_public( note_url.clone(), actor_url, e.thought.content.as_str().to_owned(), e.thought.created_at, in_reply_to, e.thought.sensitive, e.thought.content_warning, followers, ); Ok((note_url, serde_json::to_value(¬e)?)) }) .collect() } async fn get_local_objects_page( &self, user_id: uuid::Uuid, before: Option>, limit: usize, ) -> Result)>> { let uid = UserId::from_uuid(user_id); let entries = self .repo .outbox_page_for_actor(&uid, before, limit) .await .map_err(|e| anyhow!("{e}"))?; entries .into_iter() .map(|e| { let created_at = e.thought.created_at; let note_url = self.urls.thought_url(e.thought.id.as_uuid()); let actor_url = self.urls.user_url(e.author_username.as_str()); let followers = self.urls.user_followers(e.author_username.as_str()); let in_reply_to = e .thought .in_reply_to_id .map(|id| self.urls.thought_url(id.as_uuid())); let note = ThoughtNote::new_public( note_url.clone(), actor_url, e.thought.content.as_str().to_owned(), created_at, in_reply_to, e.thought.sensitive, e.thought.content_warning, followers, ); Ok((note_url, serde_json::to_value(¬e)?, created_at)) }) .collect() } async fn on_create( &self, ap_id: &Url, actor_url: &Url, object: serde_json::Value, ) -> Result<()> { let note: ThoughtNote = serde_json::from_value(object)?; let author_id = self .repo .intern_remote_actor(actor_url) .await .map_err(|e| anyhow!("{e}"))?; // Derive visibility from AP addressing conventions. let as_public = "https://www.w3.org/ns/activitystreams#Public"; let in_to = note.to.iter().any(|s| s == as_public); let in_cc = note.cc.iter().any(|s| s == as_public); let has_followers = note.to.iter().any(|s| s.ends_with("/followers")) || note.cc.iter().any(|s| s.ends_with("/followers")); let visibility = if in_to { "public" } else if in_cc { "unlisted" } else if has_followers { "followers" } else { "direct" }; self.repo .accept_note( ap_id, &author_id, ¬e.content, note.published, note.sensitive, note.summary, visibility, note.in_reply_to.as_ref(), ) .await .map_err(|e| anyhow!("{e}"))?; // Fire mention notifications for local @mentions in the note's tag array. let base_url = url::Url::parse(&self.urls.base_url) .ok() .and_then(|u| u.host_str().map(|h| h.to_string())) .unwrap_or_default(); for tag in ¬e.tag { if tag.get("type").and_then(|t| t.as_str()) != Some("Mention") { continue; } let href = match tag.get("href").and_then(|h| h.as_str()) { Some(h) => h, None => continue, }; let href_url = match url::Url::parse(href) { Ok(u) => u, Err(_) => continue, }; if href_url.host_str().unwrap_or("") != base_url { continue; } let user_uuid = href_url .path() .strip_prefix("/users/") .and_then(|s| s.split('/').next()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); if let Some(uuid) = user_uuid { self.on_mention(ap_id, uuid, actor_url) .await .unwrap_or_else(|e| { tracing::warn!(error = %e, "failed to process mention notification"); }); } } Ok(()) } async fn on_update( &self, ap_id: &Url, _actor_url: &Url, object: serde_json::Value, ) -> Result<()> { let note: ThoughtNote = serde_json::from_value(object)?; self.repo .apply_note_update(ap_id, ¬e.content) .await .map_err(|e| anyhow!("{e}")) } async fn on_delete(&self, ap_id: &Url, _actor_url: &Url) -> Result<()> { self.repo .retract_note(ap_id) .await .map_err(|e| anyhow!("{e}")) } async fn on_actor_removed(&self, actor_url: &Url) -> Result<()> { self.repo .retract_actor_notes(actor_url) .await .map_err(|e| anyhow!("{e}")) } async fn on_like(&self, object_url: &Url, actor_url: &Url) -> Result<()> { let thought_uuid = object_url .path() .strip_prefix("/thoughts/") .and_then(|s| s.split('/').next()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let thought_uuid = match thought_uuid { Some(u) => u, None => { tracing::debug!(object = %object_url, "on_like: not a local thought URL, skipping"); return Ok(()); } }; let actor_user_id = self .repo .find_remote_actor_id(actor_url) .await .map_err(|e| anyhow!("{e}"))?; let actor_user_id = match actor_user_id { Some(id) => id, None => { tracing::debug!(actor = %actor_url, "on_like: remote actor not interned, skipping notification"); return Ok(()); } }; if let Some(ep) = &self.event_publisher { let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid); let like_id = domain::value_objects::LikeId::new(); ep.publish(&domain::events::DomainEvent::LikeAdded { like_id, user_id: actor_user_id, thought_id, }) .await .map_err(|e| anyhow!("{e}"))?; } Ok(()) } async fn on_unlike(&self, _object_url: &url::Url, _actor_url: &url::Url) -> anyhow::Result<()> { Ok(()) } async fn on_mention( &self, thought_ap_id: &url::Url, mentioned_user_uuid: uuid::Uuid, actor_url: &url::Url, ) -> anyhow::Result<()> { let author_user_id = match self .repo .find_remote_actor_id(actor_url) .await .map_err(|e| anyhow!("{e}"))? { Some(id) => id, None => return Ok(()), }; let thought_uuid = thought_ap_id .path() .strip_prefix("/thoughts/") .and_then(|s| s.split('/').next()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let thought_uuid = match thought_uuid { Some(u) => u, None => return Ok(()), }; if let Some(ep) = &self.event_publisher { ep.publish(&domain::events::DomainEvent::MentionReceived { thought_id: domain::value_objects::ThoughtId::from_uuid(thought_uuid), mentioned_user_id: domain::value_objects::UserId::from_uuid(mentioned_user_uuid), author_user_id, }) .await .map_err(|e| anyhow!("{e}"))?; } Ok(()) } async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> Result<()> { let thought_uuid = object_url .path() .strip_prefix("/thoughts/") .and_then(|s| s.split('/').next()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let thought_uuid = match thought_uuid { Some(u) => u, None => return Ok(()), }; let actor_user_id = self .repo .find_remote_actor_id(actor_url) .await .map_err(|e| anyhow!("{e}"))?; let actor_user_id = match actor_user_id { Some(id) => id, None => return Ok(()), }; if let Some(ep) = &self.event_publisher { let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid); let boost_id = domain::value_objects::BoostId::new(); ep.publish(&domain::events::DomainEvent::BoostAdded { boost_id, user_id: actor_user_id, thought_id, }) .await .map_err(|e| anyhow!("{e}"))?; } Ok(()) } async fn count_local_posts(&self) -> Result { self.repo .count_local_notes() .await .map_err(|e| anyhow!("{e}")) } }