diff --git a/crates/adapters/activitypub-base/src/service.rs b/crates/adapters/activitypub-base/src/service.rs index 92d6177..9bc1b77 100644 --- a/crates/adapters/activitypub-base/src/service.rs +++ b/crates/adapters/activitypub-base/src/service.rs @@ -30,6 +30,34 @@ use crate::{ webfinger::webfinger_handler, }; +fn thought_note_json( + thought: &domain::models::thought::Thought, + local_actor: &crate::actors::DbActor, + base_url: &str, +) -> anyhow::Result<(url::Url, serde_json::Value)> { + let ap_id = url::Url::parse(&format!("{}/thoughts/{}", base_url, thought.id))?; + let mut note = serde_json::json!({ + "type": "Note", + "id": ap_id.to_string(), + "attributedTo": local_actor.ap_id.to_string(), + "content": thought.content.as_str(), + "published": thought.created_at.to_rfc3339(), + "to": [crate::urls::AS_PUBLIC], + "cc": [local_actor.followers_url.to_string()], + "sensitive": thought.sensitive, + }); + if let Some(ref cw) = thought.content_warning { + note["summary"] = serde_json::json!(cw); + } + if let Some(ref reply_url) = thought.in_reply_to_url { + note["inReplyTo"] = serde_json::json!(reply_url); + } + if let Some(updated_at) = thought.updated_at { + note["updated"] = serde_json::json!(updated_at.to_rfc3339()); + } + Ok((ap_id, note)) +} + fn collect_inboxes(followers: &[crate::repository::Follower]) -> Vec { let mut seen = std::collections::HashSet::new(); let mut inboxes = Vec::new(); @@ -585,42 +613,6 @@ impl ActivityPubService { .await } - /// Broadcast a single object to all accepted followers as a Create activity. - /// Called by project-specific event handlers when new content is created. - pub async fn broadcast_to_followers( - &self, - local_user_id: uuid::Uuid, - ap_id: Url, - object: serde_json::Value, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { - return Ok(()); - }; - - let create = CreateActivity { - id: ap_id.clone(), - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object, - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![local_actor.followers_url.to_string()], - }; - let create_with_ctx = WithContext::new_default(create); - - let sends = - SendActivityTask::prepare(&create_with_ctx, &local_actor, inboxes, &data).await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some activity deliveries failed permanently" - ); - } - - Ok(()) - } - /// Broadcast a Delete activity to all accepted followers for a removed review. pub async fn broadcast_delete_to_followers( &self, @@ -716,43 +708,6 @@ impl ActivityPubService { Ok(()) } - /// Broadcast an Update(Note) activity to all accepted followers for an edited review. - pub async fn broadcast_update_to_followers( - &self, - local_user_id: uuid::Uuid, - object: serde_json::Value, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { - return Ok(()); - }; - - let update_id = Url::parse(&format!( - "{}/activities/update/{}", - self.base_url, - uuid::Uuid::new_v4() - ))?; - let update = crate::activities::UpdateActivity { - id: update_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object, - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![local_actor.followers_url.to_string()], - }; - let update_with_ctx = WithContext::new_default(update); - let sends = - SendActivityTask::prepare(&update_with_ctx, &local_actor, inboxes, &data).await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some update activity deliveries failed" - ); - } - Ok(()) - } - pub async fn broadcast_actor_update(&self, user_id: uuid::Uuid) -> anyhow::Result<()> { use activitypub_federation::traits::Object; @@ -1202,33 +1157,38 @@ impl domain::ports::OutboundFederationPort for ActivityPubService { ) -> Result<(), domain::errors::DomainError> { let user_uuid = author_user_id.as_uuid(); let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(user_uuid, &data) + let Some((local_actor, inboxes)) = self + .accepted_follower_inboxes(&data, user_uuid) .await + .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))? + else { + return Ok(()); + }; + + let (ap_id, note) = thought_note_json(thought, &local_actor, &self.base_url) .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - let ap_id = url::Url::parse(&format!("{}/thoughts/{}", self.base_url, thought.id)) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - - let mut note = serde_json::json!({ - "type": "Note", - "id": ap_id.to_string(), - "attributedTo": local_actor.ap_id.to_string(), - "content": thought.content.as_str(), - "published": thought.created_at.to_rfc3339(), - "to": [crate::urls::AS_PUBLIC], - "cc": [local_actor.followers_url.to_string()], - "sensitive": thought.sensitive, - }); - if let Some(ref cw) = thought.content_warning { - note["summary"] = serde_json::json!(cw); + let create = crate::activities::CreateActivity { + id: ap_id, + kind: Default::default(), + actor: activitypub_federation::fetch::object_id::ObjectId::from(local_actor.ap_id.clone()), + object: note, + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![local_actor.followers_url.to_string()], + }; + let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( + &activitypub_federation::protocol::context::WithContext::new_default(create), + &local_actor, + inboxes, + &data, + ) + .await + .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!(count = failures.len(), "some Create deliveries failed"); } - if let Some(ref reply_url) = thought.in_reply_to_url { - note["inReplyTo"] = serde_json::json!(reply_url); - } - - self.broadcast_to_followers(user_uuid, ap_id, note) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) + Ok(()) } async fn broadcast_delete( @@ -1254,35 +1214,44 @@ impl domain::ports::OutboundFederationPort for ActivityPubService { ) -> Result<(), domain::errors::DomainError> { let user_uuid = author_user_id.as_uuid(); let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(user_uuid, &data) + let Some((local_actor, inboxes)) = self + .accepted_follower_inboxes(&data, user_uuid) .await + .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))? + else { + return Ok(()); + }; + + let (_ap_id, note) = thought_note_json(thought, &local_actor, &self.base_url) .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - let ap_id = url::Url::parse(&format!("{}/thoughts/{}", self.base_url, thought.id)) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - let mut note = serde_json::json!({ - "type": "Note", - "id": ap_id.to_string(), - "attributedTo": local_actor.ap_id.to_string(), - "content": thought.content.as_str(), - "published": thought.created_at.to_rfc3339(), - "to": [crate::urls::AS_PUBLIC], - "cc": [local_actor.followers_url.to_string()], - "sensitive": thought.sensitive, - }); - if let Some(ref cw) = thought.content_warning { - note["summary"] = serde_json::json!(cw); + let update_id = url::Url::parse(&format!( + "{}/activities/update/{}", + self.base_url, + uuid::Uuid::new_v4() + )) + .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; + let update = crate::activities::UpdateActivity { + id: update_id, + kind: Default::default(), + actor: activitypub_federation::fetch::object_id::ObjectId::from(local_actor.ap_id.clone()), + object: note, + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![local_actor.followers_url.to_string()], + }; + let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( + &activitypub_federation::protocol::context::WithContext::new_default(update), + &local_actor, + inboxes, + &data, + ) + .await + .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!(count = failures.len(), "some Update deliveries failed"); } - if let Some(ref reply_url) = thought.in_reply_to_url { - note["inReplyTo"] = serde_json::json!(reply_url); - } - if let Some(updated_at) = thought.updated_at { - note["updated"] = serde_json::json!(updated_at.to_rfc3339()); - } - - self.broadcast_update_to_followers(user_uuid, note) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) + Ok(()) } async fn broadcast_announce(