refactor(activitypub-base): eliminate double get_local_actor — extract thought_note_json, remove dead broadcast_to_followers methods
This commit is contained in:
@@ -30,6 +30,34 @@ use crate::{
|
|||||||
webfinger::webfinger_handler,
|
webfinger::webfinger_handler,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
fn thought_note_json(
|
||||||
|
thought: &domain::models::thought::Thought,
|
||||||
|
local_actor: &crate::actors::DbActor,
|
||||||
|
base_url: &str,
|
||||||
|
) -> anyhow::Result<(url::Url, serde_json::Value)> {
|
||||||
|
let ap_id = url::Url::parse(&format!("{}/thoughts/{}", base_url, thought.id))?;
|
||||||
|
let mut note = serde_json::json!({
|
||||||
|
"type": "Note",
|
||||||
|
"id": ap_id.to_string(),
|
||||||
|
"attributedTo": local_actor.ap_id.to_string(),
|
||||||
|
"content": thought.content.as_str(),
|
||||||
|
"published": thought.created_at.to_rfc3339(),
|
||||||
|
"to": [crate::urls::AS_PUBLIC],
|
||||||
|
"cc": [local_actor.followers_url.to_string()],
|
||||||
|
"sensitive": thought.sensitive,
|
||||||
|
});
|
||||||
|
if let Some(ref cw) = thought.content_warning {
|
||||||
|
note["summary"] = serde_json::json!(cw);
|
||||||
|
}
|
||||||
|
if let Some(ref reply_url) = thought.in_reply_to_url {
|
||||||
|
note["inReplyTo"] = serde_json::json!(reply_url);
|
||||||
|
}
|
||||||
|
if let Some(updated_at) = thought.updated_at {
|
||||||
|
note["updated"] = serde_json::json!(updated_at.to_rfc3339());
|
||||||
|
}
|
||||||
|
Ok((ap_id, note))
|
||||||
|
}
|
||||||
|
|
||||||
fn collect_inboxes(followers: &[crate::repository::Follower]) -> Vec<Url> {
|
fn collect_inboxes(followers: &[crate::repository::Follower]) -> Vec<Url> {
|
||||||
let mut seen = std::collections::HashSet::new();
|
let mut seen = std::collections::HashSet::new();
|
||||||
let mut inboxes = Vec::new();
|
let mut inboxes = Vec::new();
|
||||||
@@ -585,42 +613,6 @@ impl ActivityPubService {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Broadcast a single object to all accepted followers as a Create activity.
|
|
||||||
/// Called by project-specific event handlers when new content is created.
|
|
||||||
pub async fn broadcast_to_followers(
|
|
||||||
&self,
|
|
||||||
local_user_id: uuid::Uuid,
|
|
||||||
ap_id: Url,
|
|
||||||
object: serde_json::Value,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let data = self.federation_config.to_request_data();
|
|
||||||
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else {
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
|
|
||||||
let create = CreateActivity {
|
|
||||||
id: ap_id.clone(),
|
|
||||||
kind: Default::default(),
|
|
||||||
actor: ObjectId::from(local_actor.ap_id.clone()),
|
|
||||||
object,
|
|
||||||
to: vec![crate::urls::AS_PUBLIC.to_string()],
|
|
||||||
cc: vec![local_actor.followers_url.to_string()],
|
|
||||||
};
|
|
||||||
let create_with_ctx = WithContext::new_default(create);
|
|
||||||
|
|
||||||
let sends =
|
|
||||||
SendActivityTask::prepare(&create_with_ctx, &local_actor, inboxes, &data).await?;
|
|
||||||
let failures = send_with_retry(sends, &data).await;
|
|
||||||
if !failures.is_empty() {
|
|
||||||
tracing::warn!(
|
|
||||||
count = failures.len(),
|
|
||||||
"some activity deliveries failed permanently"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Broadcast a Delete activity to all accepted followers for a removed review.
|
/// Broadcast a Delete activity to all accepted followers for a removed review.
|
||||||
pub async fn broadcast_delete_to_followers(
|
pub async fn broadcast_delete_to_followers(
|
||||||
&self,
|
&self,
|
||||||
@@ -716,43 +708,6 @@ impl ActivityPubService {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Broadcast an Update(Note) activity to all accepted followers for an edited review.
|
|
||||||
pub async fn broadcast_update_to_followers(
|
|
||||||
&self,
|
|
||||||
local_user_id: uuid::Uuid,
|
|
||||||
object: serde_json::Value,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let data = self.federation_config.to_request_data();
|
|
||||||
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else {
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
|
|
||||||
let update_id = Url::parse(&format!(
|
|
||||||
"{}/activities/update/{}",
|
|
||||||
self.base_url,
|
|
||||||
uuid::Uuid::new_v4()
|
|
||||||
))?;
|
|
||||||
let update = crate::activities::UpdateActivity {
|
|
||||||
id: update_id,
|
|
||||||
kind: Default::default(),
|
|
||||||
actor: ObjectId::from(local_actor.ap_id.clone()),
|
|
||||||
object,
|
|
||||||
to: vec![crate::urls::AS_PUBLIC.to_string()],
|
|
||||||
cc: vec![local_actor.followers_url.to_string()],
|
|
||||||
};
|
|
||||||
let update_with_ctx = WithContext::new_default(update);
|
|
||||||
let sends =
|
|
||||||
SendActivityTask::prepare(&update_with_ctx, &local_actor, inboxes, &data).await?;
|
|
||||||
let failures = send_with_retry(sends, &data).await;
|
|
||||||
if !failures.is_empty() {
|
|
||||||
tracing::warn!(
|
|
||||||
count = failures.len(),
|
|
||||||
"some update activity deliveries failed"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn broadcast_actor_update(&self, user_id: uuid::Uuid) -> anyhow::Result<()> {
|
pub async fn broadcast_actor_update(&self, user_id: uuid::Uuid) -> anyhow::Result<()> {
|
||||||
use activitypub_federation::traits::Object;
|
use activitypub_federation::traits::Object;
|
||||||
|
|
||||||
@@ -1202,33 +1157,38 @@ impl domain::ports::OutboundFederationPort for ActivityPubService {
|
|||||||
) -> Result<(), domain::errors::DomainError> {
|
) -> Result<(), domain::errors::DomainError> {
|
||||||
let user_uuid = author_user_id.as_uuid();
|
let user_uuid = author_user_id.as_uuid();
|
||||||
let data = self.federation_config.to_request_data();
|
let data = self.federation_config.to_request_data();
|
||||||
let local_actor = get_local_actor(user_uuid, &data)
|
let Some((local_actor, inboxes)) = self
|
||||||
|
.accepted_follower_inboxes(&data, user_uuid)
|
||||||
.await
|
.await
|
||||||
|
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?
|
||||||
|
else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
let (ap_id, note) = thought_note_json(thought, &local_actor, &self.base_url)
|
||||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||||
|
|
||||||
let ap_id = url::Url::parse(&format!("{}/thoughts/{}", self.base_url, thought.id))
|
let create = crate::activities::CreateActivity {
|
||||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
id: ap_id,
|
||||||
|
kind: Default::default(),
|
||||||
let mut note = serde_json::json!({
|
actor: activitypub_federation::fetch::object_id::ObjectId::from(local_actor.ap_id.clone()),
|
||||||
"type": "Note",
|
object: note,
|
||||||
"id": ap_id.to_string(),
|
to: vec![crate::urls::AS_PUBLIC.to_string()],
|
||||||
"attributedTo": local_actor.ap_id.to_string(),
|
cc: vec![local_actor.followers_url.to_string()],
|
||||||
"content": thought.content.as_str(),
|
};
|
||||||
"published": thought.created_at.to_rfc3339(),
|
let sends = activitypub_federation::activity_sending::SendActivityTask::prepare(
|
||||||
"to": [crate::urls::AS_PUBLIC],
|
&activitypub_federation::protocol::context::WithContext::new_default(create),
|
||||||
"cc": [local_actor.followers_url.to_string()],
|
&local_actor,
|
||||||
"sensitive": thought.sensitive,
|
inboxes,
|
||||||
});
|
&data,
|
||||||
if let Some(ref cw) = thought.content_warning {
|
)
|
||||||
note["summary"] = serde_json::json!(cw);
|
.await
|
||||||
|
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||||
|
let failures = send_with_retry(sends, &data).await;
|
||||||
|
if !failures.is_empty() {
|
||||||
|
tracing::warn!(count = failures.len(), "some Create deliveries failed");
|
||||||
}
|
}
|
||||||
if let Some(ref reply_url) = thought.in_reply_to_url {
|
Ok(())
|
||||||
note["inReplyTo"] = serde_json::json!(reply_url);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.broadcast_to_followers(user_uuid, ap_id, note)
|
|
||||||
.await
|
|
||||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn broadcast_delete(
|
async fn broadcast_delete(
|
||||||
@@ -1254,35 +1214,44 @@ impl domain::ports::OutboundFederationPort for ActivityPubService {
|
|||||||
) -> Result<(), domain::errors::DomainError> {
|
) -> Result<(), domain::errors::DomainError> {
|
||||||
let user_uuid = author_user_id.as_uuid();
|
let user_uuid = author_user_id.as_uuid();
|
||||||
let data = self.federation_config.to_request_data();
|
let data = self.federation_config.to_request_data();
|
||||||
let local_actor = get_local_actor(user_uuid, &data)
|
let Some((local_actor, inboxes)) = self
|
||||||
|
.accepted_follower_inboxes(&data, user_uuid)
|
||||||
.await
|
.await
|
||||||
|
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?
|
||||||
|
else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
let (_ap_id, note) = thought_note_json(thought, &local_actor, &self.base_url)
|
||||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||||
|
|
||||||
let ap_id = url::Url::parse(&format!("{}/thoughts/{}", self.base_url, thought.id))
|
let update_id = url::Url::parse(&format!(
|
||||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
"{}/activities/update/{}",
|
||||||
let mut note = serde_json::json!({
|
self.base_url,
|
||||||
"type": "Note",
|
uuid::Uuid::new_v4()
|
||||||
"id": ap_id.to_string(),
|
))
|
||||||
"attributedTo": local_actor.ap_id.to_string(),
|
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||||
"content": thought.content.as_str(),
|
let update = crate::activities::UpdateActivity {
|
||||||
"published": thought.created_at.to_rfc3339(),
|
id: update_id,
|
||||||
"to": [crate::urls::AS_PUBLIC],
|
kind: Default::default(),
|
||||||
"cc": [local_actor.followers_url.to_string()],
|
actor: activitypub_federation::fetch::object_id::ObjectId::from(local_actor.ap_id.clone()),
|
||||||
"sensitive": thought.sensitive,
|
object: note,
|
||||||
});
|
to: vec![crate::urls::AS_PUBLIC.to_string()],
|
||||||
if let Some(ref cw) = thought.content_warning {
|
cc: vec![local_actor.followers_url.to_string()],
|
||||||
note["summary"] = serde_json::json!(cw);
|
};
|
||||||
|
let sends = activitypub_federation::activity_sending::SendActivityTask::prepare(
|
||||||
|
&activitypub_federation::protocol::context::WithContext::new_default(update),
|
||||||
|
&local_actor,
|
||||||
|
inboxes,
|
||||||
|
&data,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||||
|
let failures = send_with_retry(sends, &data).await;
|
||||||
|
if !failures.is_empty() {
|
||||||
|
tracing::warn!(count = failures.len(), "some Update deliveries failed");
|
||||||
}
|
}
|
||||||
if let Some(ref reply_url) = thought.in_reply_to_url {
|
Ok(())
|
||||||
note["inReplyTo"] = serde_json::json!(reply_url);
|
|
||||||
}
|
|
||||||
if let Some(updated_at) = thought.updated_at {
|
|
||||||
note["updated"] = serde_json::json!(updated_at.to_rfc3339());
|
|
||||||
}
|
|
||||||
|
|
||||||
self.broadcast_update_to_followers(user_uuid, note)
|
|
||||||
.await
|
|
||||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn broadcast_announce(
|
async fn broadcast_announce(
|
||||||
|
|||||||
Reference in New Issue
Block a user