Compare commits
28 Commits
2e3b81de17
...
5f61a71336
| Author | SHA1 | Date | |
|---|---|---|---|
| 5f61a71336 | |||
| 7b1e26fa9e | |||
| bd1dd89f78 | |||
| a8caca8df8 | |||
| 296dfdaeee | |||
| db1625dc0d | |||
| 4b20bfd369 | |||
| cf78b3e28f | |||
| 159a8ca43b | |||
| cf6eec55da | |||
| 44e152783f | |||
| 2a51241bb5 | |||
| 009b2d43c9 | |||
| f946239757 | |||
| 90dbf76753 | |||
| 6c83c193ed | |||
| ca1ebc4b68 | |||
| d360e506db | |||
| f008564c32 | |||
| 82f2a3aaa0 | |||
| 3455512bbb | |||
| 09bebf7dc9 | |||
| e04b08c202 | |||
| a7527c50be | |||
| e691b20a05 | |||
| 0cf34184d9 | |||
| 6d365dd3cf | |||
| 9af1d33e71 |
@@ -13,6 +13,16 @@ use url::Url;
|
||||
#[serde(rename = "Announce")]
|
||||
pub struct AnnounceType;
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[serde(rename = "Like")]
|
||||
pub struct LikeType;
|
||||
|
||||
impl Default for LikeType {
|
||||
fn default() -> Self {
|
||||
Self
|
||||
}
|
||||
}
|
||||
|
||||
use crate::actors::DbActor;
|
||||
use crate::data::FederationData;
|
||||
use crate::error::Error;
|
||||
@@ -269,6 +279,20 @@ impl Activity for UndoActivity {
|
||||
tracing::info!(ap_id = %ap_id_str, "undo Add (watchlist remove)");
|
||||
}
|
||||
}
|
||||
"Like" => {
|
||||
if let Some(obj_url_str) = self.object.get("object").and_then(|o| o.as_str())
|
||||
&& let Ok(obj_url) = Url::parse(obj_url_str)
|
||||
&& obj_url.host_str().unwrap_or("") == data.domain
|
||||
{
|
||||
data.object_handler
|
||||
.on_unlike(&obj_url, self.actor.inner())
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
tracing::warn!(error = %e, "failed to process unlike");
|
||||
});
|
||||
}
|
||||
tracing::info!(actor = %self.actor.inner(), "received Undo(Like)");
|
||||
}
|
||||
other => {
|
||||
tracing::debug!(kind = %other, "ignoring Undo of unknown activity type");
|
||||
}
|
||||
@@ -317,7 +341,14 @@ impl Activity for CreateActivity {
|
||||
tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain");
|
||||
return Ok(());
|
||||
}
|
||||
let ap_id = self.id.clone();
|
||||
// Use the Note's own id, not the Create activity id (which ends in /activity).
|
||||
// Delete activities reference the Note id, so they must match.
|
||||
let ap_id = self
|
||||
.object
|
||||
.get("id")
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| Url::parse(s).ok())
|
||||
.unwrap_or_else(|| self.id.clone());
|
||||
let actor_url = self.actor.inner().clone();
|
||||
data.object_handler
|
||||
.on_create(&ap_id, &actor_url, self.object)
|
||||
@@ -506,11 +537,68 @@ impl Activity for AnnounceActivity {
|
||||
self.published.unwrap_or_else(chrono::Utc::now),
|
||||
)
|
||||
.await?;
|
||||
data.object_handler
|
||||
.on_announce_received(&self.object, self.actor.inner())
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
tracing::warn!(error = %e, "failed to process announce notification");
|
||||
});
|
||||
tracing::info!(actor = %self.actor.inner(), object = %self.object, "received announce");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// --- Like ---
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct LikeActivity {
|
||||
pub id: Url,
|
||||
#[serde(rename = "type")]
|
||||
pub kind: LikeType,
|
||||
pub actor: ObjectId<DbActor>,
|
||||
pub object: Url,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Activity for LikeActivity {
|
||||
type DataType = FederationData;
|
||||
type Error = crate::error::Error;
|
||||
|
||||
fn id(&self) -> &Url {
|
||||
&self.id
|
||||
}
|
||||
|
||||
fn actor(&self) -> &Url {
|
||||
self.actor.inner()
|
||||
}
|
||||
|
||||
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
let domain = self.actor().host_str().unwrap_or("");
|
||||
if data.federation_repo.is_domain_blocked(domain).await? {
|
||||
tracing::info!(actor = %self.actor(), "ignoring Like from blocked domain");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Only process if the liked object is on our instance.
|
||||
if self.object.host_str().unwrap_or("") != data.domain {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
data.object_handler
|
||||
.on_like(&self.object, self.actor.inner())
|
||||
.await
|
||||
.map_err(|e| crate::error::Error::from(anyhow::anyhow!(e)))?;
|
||||
|
||||
tracing::info!(actor = %self.actor.inner(), object = %self.object, "received like");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// --- Add ---
|
||||
|
||||
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
|
||||
@@ -642,4 +730,6 @@ pub enum InboxActivities {
|
||||
Add(AddActivity),
|
||||
#[serde(rename = "Block")]
|
||||
Block(BlockActivity),
|
||||
#[serde(rename = "Like")]
|
||||
Like(LikeActivity),
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ pub struct DbActor {
|
||||
pub public_key_pem: String,
|
||||
pub private_key_pem: Option<String>,
|
||||
pub inbox_url: Url,
|
||||
pub shared_inbox_url: Option<Url>,
|
||||
pub outbox_url: Url,
|
||||
pub followers_url: Url,
|
||||
pub following_url: Url,
|
||||
@@ -118,6 +119,7 @@ pub async fn get_local_actor(
|
||||
|
||||
let ap_id = crate::urls::actor_url(&data.base_url, user_id);
|
||||
let inbox_url = Url::parse(&format!("{}/inbox", &ap_id)).expect("valid inbox url");
|
||||
let shared_inbox_url = Url::parse(&format!("{}/inbox", data.base_url)).ok();
|
||||
let outbox_url = Url::parse(&format!("{}/outbox", &ap_id)).expect("valid outbox url");
|
||||
let followers_url = Url::parse(&format!("{}/followers", &ap_id)).expect("valid followers url");
|
||||
let following_url = Url::parse(&format!("{}/following", &ap_id)).expect("valid following url");
|
||||
@@ -128,6 +130,7 @@ pub async fn get_local_actor(
|
||||
public_key_pem: public_key,
|
||||
private_key_pem: Some(private_key),
|
||||
inbox_url,
|
||||
shared_inbox_url,
|
||||
outbox_url,
|
||||
followers_url,
|
||||
following_url,
|
||||
@@ -181,6 +184,7 @@ impl Object for DbActor {
|
||||
|
||||
let ap_id = crate::urls::actor_url(&data.base_url, user_id);
|
||||
let inbox_url = Url::parse(&format!("{}/inbox", &ap_id)).expect("valid url");
|
||||
let shared_inbox_url = Url::parse(&format!("{}/inbox", data.base_url)).ok();
|
||||
let outbox_url = Url::parse(&format!("{}/outbox", &ap_id)).expect("valid url");
|
||||
let followers_url = Url::parse(&format!("{}/followers", &ap_id)).expect("valid url");
|
||||
let following_url = Url::parse(&format!("{}/following", &ap_id)).expect("valid url");
|
||||
@@ -191,6 +195,7 @@ impl Object for DbActor {
|
||||
public_key_pem: public_key,
|
||||
private_key_pem: private_key,
|
||||
inbox_url,
|
||||
shared_inbox_url,
|
||||
outbox_url,
|
||||
followers_url,
|
||||
following_url,
|
||||
@@ -268,11 +273,12 @@ impl Object for DbActor {
|
||||
}
|
||||
|
||||
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
|
||||
let shared_inbox_url = json.endpoints.as_ref().map(|e| e.shared_inbox.to_string());
|
||||
let actor = RemoteActor {
|
||||
url: json.id.inner().to_string(),
|
||||
handle: json.preferred_username.clone(),
|
||||
inbox_url: json.inbox.to_string(),
|
||||
shared_inbox_url: None,
|
||||
shared_inbox_url,
|
||||
display_name: json.name.clone(),
|
||||
avatar_url: json.icon.as_ref().map(|i| i.url.to_string()),
|
||||
outbox_url: Some(json.outbox.to_string()),
|
||||
@@ -283,6 +289,10 @@ impl Object for DbActor {
|
||||
let user_id = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, url_str.as_bytes());
|
||||
let ap_id = json.id.inner().clone();
|
||||
let inbox_url = json.inbox.clone();
|
||||
let shared_inbox_url = json
|
||||
.endpoints
|
||||
.as_ref()
|
||||
.and_then(|e| Url::parse(e.shared_inbox.as_str()).ok());
|
||||
let outbox_url = json.outbox.clone();
|
||||
let followers_url = json.followers.clone();
|
||||
let following_url = json.following.clone();
|
||||
@@ -293,6 +303,7 @@ impl Object for DbActor {
|
||||
public_key_pem: json.public_key.public_key_pem,
|
||||
private_key_pem: None,
|
||||
inbox_url,
|
||||
shared_inbox_url,
|
||||
outbox_url,
|
||||
followers_url,
|
||||
following_url,
|
||||
|
||||
@@ -42,6 +42,27 @@ pub trait ApObjectHandler: Send + Sync {
|
||||
/// Actor unfollowed/was removed — clean up all their remote content.
|
||||
async fn on_actor_removed(&self, actor_url: &Url) -> anyhow::Result<()>;
|
||||
|
||||
/// Called when a remote actor likes a local thought.
|
||||
/// `object_url` is the AP URL of the liked note (e.g. `{base}/thoughts/{uuid}`).
|
||||
/// `actor_url` is the AP URL of the remote actor who sent the Like.
|
||||
async fn on_like(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
|
||||
|
||||
/// Called when a remote actor boosts (Announce) a local thought.
|
||||
/// `object_url` is the AP URL of the announced note.
|
||||
/// `actor_url` is the AP URL of the remote actor who sent the Announce.
|
||||
async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
|
||||
|
||||
/// Called when a remote actor removes a Like from a local thought.
|
||||
async fn on_unlike(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
|
||||
|
||||
/// Called when an inbound Note tags a local user with a Mention.
|
||||
async fn on_mention(
|
||||
&self,
|
||||
thought_ap_id: &Url,
|
||||
mentioned_user_uuid: uuid::Uuid,
|
||||
actor_url: &Url,
|
||||
) -> anyhow::Result<()>;
|
||||
|
||||
/// Total number of locally-authored posts across all users.
|
||||
async fn count_local_posts(&self) -> anyhow::Result<u64>;
|
||||
}
|
||||
|
||||
@@ -28,6 +28,44 @@ use crate::{
|
||||
webfinger::webfinger_handler,
|
||||
};
|
||||
|
||||
fn content_to_html(text: &str) -> String {
|
||||
let escaped = text
|
||||
.replace('&', "&")
|
||||
.replace('<', "<")
|
||||
.replace('>', ">")
|
||||
.replace('"', """);
|
||||
let paragraphs: Vec<&str> = escaped.split('\n').filter(|s| !s.is_empty()).collect();
|
||||
if paragraphs.is_empty() {
|
||||
format!("<p>{}</p>", escaped)
|
||||
} else {
|
||||
paragraphs
|
||||
.iter()
|
||||
.map(|p| format!("<p>{}</p>", p))
|
||||
.collect::<Vec<_>>()
|
||||
.join("")
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_hashtag_tags(content: &str, base_url: &str) -> Vec<serde_json::Value> {
|
||||
let mut seen = std::collections::HashSet::new();
|
||||
let mut tags = Vec::new();
|
||||
for word in content.split_whitespace() {
|
||||
let tag = word.trim_matches(|c: char| !c.is_alphanumeric() && c != '#');
|
||||
if let Some(name) = tag.strip_prefix('#')
|
||||
&& !name.is_empty()
|
||||
&& seen.insert(name.to_lowercase())
|
||||
{
|
||||
let lower = name.to_lowercase();
|
||||
tags.push(serde_json::json!({
|
||||
"type": "Hashtag",
|
||||
"name": format!("#{}", lower),
|
||||
"href": format!("{}/tags/{}", base_url, lower),
|
||||
}));
|
||||
}
|
||||
}
|
||||
tags
|
||||
}
|
||||
|
||||
fn thought_note_json(
|
||||
thought: &domain::models::thought::Thought,
|
||||
local_actor: &crate::actors::DbActor,
|
||||
@@ -56,7 +94,7 @@ fn thought_note_json(
|
||||
"id": ap_id.to_string(),
|
||||
"url": ap_id.to_string(),
|
||||
"attributedTo": local_actor.ap_id.to_string(),
|
||||
"content": thought.content.as_str(),
|
||||
"content": content_to_html(thought.content.as_str()),
|
||||
"published": thought.created_at.to_rfc3339(),
|
||||
"to": to,
|
||||
"cc": cc,
|
||||
@@ -71,6 +109,10 @@ fn thought_note_json(
|
||||
if let Some(updated_at) = thought.updated_at {
|
||||
note["updated"] = serde_json::json!(updated_at.to_rfc3339());
|
||||
}
|
||||
let hashtag_tags = extract_hashtag_tags(thought.content.as_str(), base_url);
|
||||
if !hashtag_tags.is_empty() {
|
||||
note["tag"] = serde_json::json!(hashtag_tags);
|
||||
}
|
||||
Ok((ap_id, note))
|
||||
}
|
||||
|
||||
@@ -219,7 +261,13 @@ impl ActivityPubService {
|
||||
Ok(serde_json::to_string(&WithContext::new_default(person))?)
|
||||
}
|
||||
|
||||
pub fn router(&self) -> Router {
|
||||
/// Returns the ActivityPub router compatible with any outer state `S`.
|
||||
/// Handlers only use `Data<FederationData>` injected by the middleware layer,
|
||||
/// so the router is independent of the application state type.
|
||||
pub fn router<S>(&self) -> Router<S>
|
||||
where
|
||||
S: Clone + Send + Sync + 'static,
|
||||
{
|
||||
Router::new()
|
||||
.route("/.well-known/nodeinfo", get(nodeinfo_well_known_handler))
|
||||
.route("/nodeinfo/2.0", get(nodeinfo_handler))
|
||||
@@ -340,6 +388,105 @@ impl ActivityPubService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send a Like activity to a single inbox.
|
||||
pub async fn broadcast_like_to_inbox(
|
||||
&self,
|
||||
liker_user_id: uuid::Uuid,
|
||||
object_ap_id: url::Url,
|
||||
author_inbox_url: url::Url,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let local_actor = get_local_actor(liker_user_id, &data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
// Deterministic ID so Undo(Like) can reference the same activity.
|
||||
let like_id = url::Url::parse(&format!(
|
||||
"{}/activities/like/{}",
|
||||
self.base_url,
|
||||
uuid::Uuid::new_v5(
|
||||
&uuid::Uuid::NAMESPACE_URL,
|
||||
format!("{}/{}", liker_user_id, object_ap_id).as_bytes(),
|
||||
)
|
||||
))?;
|
||||
|
||||
let like = crate::activities::LikeActivity {
|
||||
id: like_id,
|
||||
kind: Default::default(),
|
||||
actor: ObjectId::from(local_actor.ap_id.clone()),
|
||||
object: object_ap_id,
|
||||
};
|
||||
|
||||
let sends = SendActivityTask::prepare(
|
||||
&WithContext::new_default(like),
|
||||
&local_actor,
|
||||
vec![author_inbox_url],
|
||||
&data,
|
||||
)
|
||||
.await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(
|
||||
count = failures.len(),
|
||||
"some Like deliveries failed permanently"
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send an Undo(Like) activity to a single inbox.
|
||||
pub async fn broadcast_undo_like_to_inbox(
|
||||
&self,
|
||||
liker_user_id: uuid::Uuid,
|
||||
object_ap_id: url::Url,
|
||||
author_inbox_url: url::Url,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let local_actor = get_local_actor(liker_user_id, &data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
// Reconstruct the same deterministic like ID.
|
||||
let like_id = url::Url::parse(&format!(
|
||||
"{}/activities/like/{}",
|
||||
self.base_url,
|
||||
uuid::Uuid::new_v5(
|
||||
&uuid::Uuid::NAMESPACE_URL,
|
||||
format!("{}/{}", liker_user_id, object_ap_id).as_bytes(),
|
||||
)
|
||||
))?;
|
||||
|
||||
let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let undo = crate::activities::UndoActivity {
|
||||
id: undo_id,
|
||||
kind: Default::default(),
|
||||
actor: ObjectId::from(local_actor.ap_id.clone()),
|
||||
object: serde_json::json!({
|
||||
"type": "Like",
|
||||
"id": like_id.to_string(),
|
||||
"actor": local_actor.ap_id.to_string(),
|
||||
"object": object_ap_id.to_string(),
|
||||
}),
|
||||
};
|
||||
|
||||
let sends = SendActivityTask::prepare(
|
||||
&WithContext::new_default(undo),
|
||||
&local_actor,
|
||||
vec![author_inbox_url],
|
||||
&data,
|
||||
)
|
||||
.await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(
|
||||
count = failures.len(),
|
||||
"some Undo(Like) deliveries failed permanently"
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Resolve a `@user@domain` handle to a `DbActor` over HTTPS directly.
|
||||
/// The library's `webfinger_resolve_actor` tries HTTP first in debug mode, which breaks
|
||||
/// on servers that don't redirect HTTP → HTTPS.
|
||||
@@ -428,9 +575,12 @@ impl ActivityPubService {
|
||||
url: remote_actor.ap_id.to_string(),
|
||||
handle: full_handle,
|
||||
inbox_url: remote_actor.inbox_url.to_string(),
|
||||
shared_inbox_url: None,
|
||||
shared_inbox_url: remote_actor
|
||||
.shared_inbox_url
|
||||
.as_ref()
|
||||
.map(|u| u.to_string()),
|
||||
display_name: Some(remote_actor.username.clone()),
|
||||
avatar_url: None,
|
||||
avatar_url: remote_actor.avatar_url.as_ref().map(|u| u.to_string()),
|
||||
outbox_url: Some(remote_actor.outbox_url.to_string()),
|
||||
};
|
||||
data.federation_repo
|
||||
@@ -1390,6 +1540,45 @@ impl domain::ports::OutboundFederationPort for ActivityPubService {
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||
}
|
||||
|
||||
async fn broadcast_like(
|
||||
&self,
|
||||
liker_user_id: &domain::value_objects::UserId,
|
||||
object_ap_id: &str,
|
||||
author_inbox_url: &str,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
let object = url::Url::parse(object_ap_id)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
let inbox = url::Url::parse(author_inbox_url)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
self.broadcast_like_to_inbox(liker_user_id.as_uuid(), object, inbox)
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||
}
|
||||
|
||||
async fn broadcast_undo_like(
|
||||
&self,
|
||||
liker_user_id: &domain::value_objects::UserId,
|
||||
object_ap_id: &str,
|
||||
author_inbox_url: &str,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
let object = url::Url::parse(object_ap_id)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
let inbox = url::Url::parse(author_inbox_url)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
self.broadcast_undo_like_to_inbox(liker_user_id.as_uuid(), object, inbox)
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||
}
|
||||
|
||||
async fn broadcast_actor_update(
|
||||
&self,
|
||||
user_id: &domain::value_objects::UserId,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
self.broadcast_actor_update(user_id.as_uuid())
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -1452,7 +1641,7 @@ impl domain::ports::FederationActionPort for ActivityPubService {
|
||||
handle: full_handle,
|
||||
display_name: Some(actor.username.clone()),
|
||||
inbox_url: actor.inbox_url.to_string(),
|
||||
shared_inbox_url: None,
|
||||
shared_inbox_url: actor.shared_inbox_url.as_ref().map(|u| u.to_string()),
|
||||
public_key: actor.public_key_pem.clone(),
|
||||
avatar_url: actor.avatar_url.as_ref().map(|u| u.to_string()),
|
||||
last_fetched_at: actor.last_refreshed_at,
|
||||
|
||||
@@ -7,19 +7,25 @@ use url::Url;
|
||||
use crate::note::ThoughtNote;
|
||||
use crate::urls::ThoughtsUrls;
|
||||
use activitypub_base::ApObjectHandler;
|
||||
use domain::ports::ActivityPubRepository;
|
||||
use domain::ports::{ActivityPubRepository, EventPublisher};
|
||||
use domain::value_objects::UserId;
|
||||
|
||||
pub struct ThoughtsObjectHandler {
|
||||
repo: Arc<dyn ActivityPubRepository>,
|
||||
urls: ThoughtsUrls,
|
||||
event_publisher: Option<Arc<dyn EventPublisher>>,
|
||||
}
|
||||
|
||||
impl ThoughtsObjectHandler {
|
||||
pub fn new(repo: Arc<dyn ActivityPubRepository>, base_url: &str) -> Self {
|
||||
pub fn new(
|
||||
repo: Arc<dyn ActivityPubRepository>,
|
||||
base_url: &str,
|
||||
event_publisher: Option<Arc<dyn EventPublisher>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
repo,
|
||||
urls: ThoughtsUrls::new(base_url),
|
||||
event_publisher,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -138,9 +144,47 @@ impl ApObjectHandler for ThoughtsObjectHandler {
|
||||
note.sensitive,
|
||||
note.summary,
|
||||
visibility,
|
||||
note.in_reply_to.as_ref(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| anyhow!("{e}"))
|
||||
.map_err(|e| anyhow!("{e}"))?;
|
||||
|
||||
// Fire mention notifications for local @mentions in the note's tag array.
|
||||
let base_url = url::Url::parse(&self.urls.base_url)
|
||||
.ok()
|
||||
.and_then(|u| u.host_str().map(|h| h.to_string()))
|
||||
.unwrap_or_default();
|
||||
|
||||
for tag in ¬e.tag {
|
||||
if tag.get("type").and_then(|t| t.as_str()) != Some("Mention") {
|
||||
continue;
|
||||
}
|
||||
let href = match tag.get("href").and_then(|h| h.as_str()) {
|
||||
Some(h) => h,
|
||||
None => continue,
|
||||
};
|
||||
let href_url = match url::Url::parse(href) {
|
||||
Ok(u) => u,
|
||||
Err(_) => continue,
|
||||
};
|
||||
if href_url.host_str().unwrap_or("") != base_url {
|
||||
continue;
|
||||
}
|
||||
let user_uuid = href_url
|
||||
.path()
|
||||
.strip_prefix("/users/")
|
||||
.and_then(|s| s.split('/').next())
|
||||
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||||
if let Some(uuid) = user_uuid {
|
||||
self.on_mention(ap_id, uuid, actor_url)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
tracing::warn!(error = %e, "failed to process mention notification");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_update(
|
||||
@@ -170,6 +214,132 @@ impl ApObjectHandler for ThoughtsObjectHandler {
|
||||
.map_err(|e| anyhow!("{e}"))
|
||||
}
|
||||
|
||||
async fn on_like(&self, object_url: &Url, actor_url: &Url) -> Result<()> {
|
||||
let thought_uuid = object_url
|
||||
.path()
|
||||
.strip_prefix("/thoughts/")
|
||||
.and_then(|s| s.split('/').next())
|
||||
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||||
|
||||
let thought_uuid = match thought_uuid {
|
||||
Some(u) => u,
|
||||
None => {
|
||||
tracing::debug!(object = %object_url, "on_like: not a local thought URL, skipping");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let actor_user_id = self
|
||||
.repo
|
||||
.find_remote_actor_id(actor_url)
|
||||
.await
|
||||
.map_err(|e| anyhow!("{e}"))?;
|
||||
|
||||
let actor_user_id = match actor_user_id {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
tracing::debug!(actor = %actor_url, "on_like: remote actor not interned, skipping notification");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(ep) = &self.event_publisher {
|
||||
let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid);
|
||||
let like_id = domain::value_objects::LikeId::new();
|
||||
ep.publish(&domain::events::DomainEvent::LikeAdded {
|
||||
like_id,
|
||||
user_id: actor_user_id,
|
||||
thought_id,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow!("{e}"))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_unlike(&self, _object_url: &url::Url, _actor_url: &url::Url) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_mention(
|
||||
&self,
|
||||
thought_ap_id: &url::Url,
|
||||
mentioned_user_uuid: uuid::Uuid,
|
||||
actor_url: &url::Url,
|
||||
) -> anyhow::Result<()> {
|
||||
let author_user_id = match self
|
||||
.repo
|
||||
.find_remote_actor_id(actor_url)
|
||||
.await
|
||||
.map_err(|e| anyhow!("{e}"))?
|
||||
{
|
||||
Some(id) => id,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
let thought_uuid = thought_ap_id
|
||||
.path()
|
||||
.strip_prefix("/thoughts/")
|
||||
.and_then(|s| s.split('/').next())
|
||||
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||||
|
||||
let thought_uuid = match thought_uuid {
|
||||
Some(u) => u,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
if let Some(ep) = &self.event_publisher {
|
||||
ep.publish(&domain::events::DomainEvent::MentionReceived {
|
||||
thought_id: domain::value_objects::ThoughtId::from_uuid(thought_uuid),
|
||||
mentioned_user_id: domain::value_objects::UserId::from_uuid(mentioned_user_uuid),
|
||||
author_user_id,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow!("{e}"))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> Result<()> {
|
||||
let thought_uuid = object_url
|
||||
.path()
|
||||
.strip_prefix("/thoughts/")
|
||||
.and_then(|s| s.split('/').next())
|
||||
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||||
|
||||
let thought_uuid = match thought_uuid {
|
||||
Some(u) => u,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
let actor_user_id = self
|
||||
.repo
|
||||
.find_remote_actor_id(actor_url)
|
||||
.await
|
||||
.map_err(|e| anyhow!("{e}"))?;
|
||||
|
||||
let actor_user_id = match actor_user_id {
|
||||
Some(id) => id,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
if let Some(ep) = &self.event_publisher {
|
||||
let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid);
|
||||
let boost_id = domain::value_objects::BoostId::new();
|
||||
ep.publish(&domain::events::DomainEvent::BoostAdded {
|
||||
boost_id,
|
||||
user_id: actor_user_id,
|
||||
thought_id,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow!("{e}"))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn count_local_posts(&self) -> Result<u64> {
|
||||
self.repo
|
||||
.count_local_notes()
|
||||
|
||||
@@ -24,6 +24,8 @@ pub struct ThoughtNote {
|
||||
pub sensitive: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub summary: Option<String>,
|
||||
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
||||
pub tag: Vec<serde_json::Value>,
|
||||
}
|
||||
|
||||
impl ThoughtNote {
|
||||
@@ -50,6 +52,7 @@ impl ThoughtNote {
|
||||
in_reply_to,
|
||||
sensitive,
|
||||
summary,
|
||||
tag: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,6 +68,9 @@ pub enum EventPayload {
|
||||
UserRegistered {
|
||||
user_id: String,
|
||||
},
|
||||
ProfileUpdated {
|
||||
user_id: String,
|
||||
},
|
||||
FetchRemoteActorPosts {
|
||||
actor_ap_url: String,
|
||||
outbox_url: String,
|
||||
@@ -78,6 +81,11 @@ pub enum EventPayload {
|
||||
connection_type: String,
|
||||
page: u32,
|
||||
},
|
||||
MentionReceived {
|
||||
thought_id: String,
|
||||
mentioned_user_id: String,
|
||||
author_user_id: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl EventPayload {
|
||||
@@ -98,8 +106,10 @@ impl EventPayload {
|
||||
Self::UserBlocked { .. } => "users.blocked",
|
||||
Self::UserUnblocked { .. } => "users.unblocked",
|
||||
Self::UserRegistered { .. } => "users.registered",
|
||||
Self::ProfileUpdated { .. } => "users.profile_updated",
|
||||
Self::FetchRemoteActorPosts { .. } => "federation.fetch_outbox",
|
||||
Self::FetchActorConnections { .. } => "federation.fetch_connections",
|
||||
Self::MentionReceived { .. } => "mentions.received",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -209,6 +219,9 @@ impl From<&DomainEvent> for EventPayload {
|
||||
DomainEvent::UserRegistered { user_id } => Self::UserRegistered {
|
||||
user_id: user_id.to_string(),
|
||||
},
|
||||
DomainEvent::ProfileUpdated { user_id } => Self::ProfileUpdated {
|
||||
user_id: user_id.to_string(),
|
||||
},
|
||||
DomainEvent::FetchRemoteActorPosts {
|
||||
actor_ap_url,
|
||||
outbox_url,
|
||||
@@ -227,6 +240,15 @@ impl From<&DomainEvent> for EventPayload {
|
||||
connection_type: connection_type.clone(),
|
||||
page: *page,
|
||||
},
|
||||
DomainEvent::MentionReceived {
|
||||
thought_id,
|
||||
mentioned_user_id,
|
||||
author_user_id,
|
||||
} => Self::MentionReceived {
|
||||
thought_id: thought_id.to_string(),
|
||||
mentioned_user_id: mentioned_user_id.to_string(),
|
||||
author_user_id: author_user_id.to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -345,6 +367,9 @@ impl TryFrom<EventPayload> for DomainEvent {
|
||||
EventPayload::UserRegistered { user_id } => DomainEvent::UserRegistered {
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
},
|
||||
EventPayload::ProfileUpdated { user_id } => DomainEvent::ProfileUpdated {
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
},
|
||||
EventPayload::FetchRemoteActorPosts {
|
||||
actor_ap_url,
|
||||
outbox_url,
|
||||
@@ -363,6 +388,18 @@ impl TryFrom<EventPayload> for DomainEvent {
|
||||
connection_type,
|
||||
page,
|
||||
},
|
||||
EventPayload::MentionReceived {
|
||||
thought_id,
|
||||
mentioned_user_id,
|
||||
author_user_id,
|
||||
} => DomainEvent::MentionReceived {
|
||||
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||
mentioned_user_id: UserId::from_uuid(parse_uuid(
|
||||
&mentioned_user_id,
|
||||
"mentioned_user_id",
|
||||
)?),
|
||||
author_user_id: UserId::from_uuid(parse_uuid(&author_user_id, "author_user_id")?),
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,11 +218,24 @@ impl ActivityPubRepository for PgActivityPubRepository {
|
||||
sensitive: bool,
|
||||
content_warning: Option<String>,
|
||||
visibility: &str,
|
||||
in_reply_to: Option<&Url>,
|
||||
) -> Result<(), DomainError> {
|
||||
let capped: String = content.chars().take(500).collect();
|
||||
let (in_reply_to_id, in_reply_to_url) = match in_reply_to {
|
||||
Some(url) => {
|
||||
// If the parent is a local thought, extract its UUID for in_reply_to_id.
|
||||
let local_uuid = url
|
||||
.path()
|
||||
.strip_prefix("/thoughts/")
|
||||
.and_then(|s| s.split('/').next())
|
||||
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||||
(local_uuid, Some(url.as_str().to_string()))
|
||||
}
|
||||
None => (None, None),
|
||||
};
|
||||
sqlx::query(
|
||||
"INSERT INTO thoughts(id,user_id,content,ap_id,visibility,sensitive,local,content_warning,created_at)
|
||||
VALUES($1,$2,$3,$4,$8,$5,false,$6,$7) ON CONFLICT(ap_id) DO NOTHING",
|
||||
"INSERT INTO thoughts(id,user_id,content,ap_id,visibility,sensitive,local,content_warning,created_at,in_reply_to_id,in_reply_to_url)
|
||||
VALUES($1,$2,$3,$4,$8,$5,false,$6,$7,$9,$10) ON CONFLICT(ap_id) DO NOTHING",
|
||||
)
|
||||
.bind(uuid::Uuid::new_v4())
|
||||
.bind(author_id.as_uuid())
|
||||
@@ -232,6 +245,8 @@ impl ActivityPubRepository for PgActivityPubRepository {
|
||||
.bind(content_warning)
|
||||
.bind(published)
|
||||
.bind(visibility)
|
||||
.bind(in_reply_to_id)
|
||||
.bind(&in_reply_to_url)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||
@@ -308,6 +323,7 @@ mod tests {
|
||||
false,
|
||||
None,
|
||||
"public",
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -67,6 +67,19 @@ struct FeedRow {
|
||||
boosted_by_viewer: bool,
|
||||
}
|
||||
|
||||
fn federation_following_clause(follower: Option<uuid::Uuid>) -> String {
|
||||
match follower {
|
||||
Some(fid) => format!(
|
||||
" OR t.user_id IN (
|
||||
SELECT u2.id FROM users u2
|
||||
JOIN federation_following ff ON u2.ap_id = ff.remote_actor_url
|
||||
WHERE ff.local_user_id = '{fid}'
|
||||
)"
|
||||
),
|
||||
None => String::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn feed_select(viewer: Option<uuid::Uuid>) -> String {
|
||||
let viewer_checks = match viewer {
|
||||
Some(uid) => format!(
|
||||
@@ -82,15 +95,27 @@ fn feed_select(viewer: Option<uuid::Uuid>) -> String {
|
||||
t.in_reply_to_id, t.in_reply_to_url, t.ap_id AS t_ap_id,
|
||||
t.visibility, t.content_warning, t.sensitive, t.local AS t_local,
|
||||
t.created_at AS thought_created_at, t.updated_at,
|
||||
u.id AS author_id, u.username, u.email, u.password_hash,
|
||||
u.display_name, u.bio, u.avatar_url, u.header_url, u.custom_css,
|
||||
u.id AS author_id,
|
||||
CASE WHEN NOT u.local AND ra.handle IS NOT NULL AND ra.handle != ''
|
||||
THEN '@' || ra.handle ||
|
||||
CASE WHEN ra.handle NOT LIKE '%@%'
|
||||
THEN '@' || SPLIT_PART(ra.url, '/', 3)
|
||||
ELSE '' END
|
||||
ELSE u.username END AS username,
|
||||
u.email, u.password_hash,
|
||||
COALESCE(ra.display_name, u.display_name) AS display_name,
|
||||
u.bio,
|
||||
COALESCE(ra.avatar_url, u.avatar_url) AS avatar_url,
|
||||
u.header_url, u.custom_css,
|
||||
u.local AS author_local, u.ap_id AS u_ap_id, u.inbox_url,
|
||||
u.created_at AS author_created_at, u.updated_at AS author_updated_at,
|
||||
(SELECT COUNT(*) FROM likes l WHERE l.thought_id=t.id) AS like_count,
|
||||
(SELECT COUNT(*) FROM boosts b WHERE b.thought_id=t.id) AS boost_count,
|
||||
(SELECT COUNT(*) FROM thoughts r WHERE r.in_reply_to_id=t.id) AS reply_count,
|
||||
{viewer_checks}
|
||||
FROM thoughts t JOIN users u ON u.id=t.user_id"
|
||||
FROM thoughts t
|
||||
JOIN users u ON u.id=t.user_id
|
||||
LEFT JOIN remote_actors ra ON u.ap_id = ra.url"
|
||||
)
|
||||
}
|
||||
|
||||
@@ -146,16 +171,19 @@ impl FeedRepository for PgFeedRepository {
|
||||
) -> Result<Paginated<FeedEntry>, DomainError> {
|
||||
let ids: Vec<uuid::Uuid> = following_ids.iter().map(|id| id.as_uuid()).collect();
|
||||
let viewer = viewer_id.map(|v| v.as_uuid());
|
||||
let total: i64 = sqlx::query_scalar(
|
||||
"SELECT COUNT(*) FROM thoughts t WHERE t.user_id=ANY($1) AND t.visibility != 'direct'",
|
||||
)
|
||||
.bind(&ids)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
let fed_clause = federation_following_clause(viewer);
|
||||
let count_sql = format!(
|
||||
"SELECT COUNT(*) FROM thoughts t WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct'",
|
||||
fed_clause
|
||||
);
|
||||
let total: i64 = sqlx::query_scalar(&count_sql)
|
||||
.bind(&ids)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
let sel = feed_select(viewer);
|
||||
let sql = format!("{sel} WHERE t.user_id=ANY($1) AND t.visibility != 'direct' ORDER BY t.created_at DESC LIMIT $2 OFFSET $3");
|
||||
let sql = format!("{sel} WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct' ORDER BY t.created_at DESC LIMIT $2 OFFSET $3", fed_clause);
|
||||
let rows = sqlx::query_as::<_, FeedRow>(&sql)
|
||||
.bind(&ids)
|
||||
.bind(page.limit())
|
||||
|
||||
@@ -33,6 +33,8 @@ pub struct ThoughtResponse {
|
||||
pub author: UserResponse,
|
||||
#[serde(rename = "replyToId")]
|
||||
pub in_reply_to_id: Option<Uuid>,
|
||||
#[serde(rename = "replyToUrl", skip_serializing_if = "Option::is_none")]
|
||||
pub in_reply_to_url: Option<String>,
|
||||
pub visibility: String,
|
||||
pub content_warning: Option<String>,
|
||||
pub sensitive: bool,
|
||||
|
||||
@@ -48,6 +48,29 @@ impl FederationEventService {
|
||||
Some(u) => u,
|
||||
None => return Ok(()),
|
||||
};
|
||||
// For replies to remote posts: in_reply_to_url is None but in_reply_to_id
|
||||
// points to the locally-stored remote thought. Resolve its ap_id so the
|
||||
// outbound Note includes inReplyTo and Mastodon threads it correctly.
|
||||
let thought = if thought.in_reply_to_url.is_none() {
|
||||
if let Some(ref reply_id) = thought.in_reply_to_id {
|
||||
match self.thoughts.find_by_id(reply_id).await? {
|
||||
Some(parent) => {
|
||||
let parent_ap_url = parent.ap_id.unwrap_or_else(|| {
|
||||
format!("{}/thoughts/{}", self.base_url, reply_id)
|
||||
});
|
||||
domain::models::thought::Thought {
|
||||
in_reply_to_url: Some(parent_ap_url),
|
||||
..thought
|
||||
}
|
||||
}
|
||||
None => thought,
|
||||
}
|
||||
} else {
|
||||
thought
|
||||
}
|
||||
} else {
|
||||
thought
|
||||
};
|
||||
self.ap
|
||||
.broadcast_create(user_id, &thought, user.username.as_str())
|
||||
.await
|
||||
@@ -93,6 +116,12 @@ impl FederationEventService {
|
||||
user_id,
|
||||
thought_id,
|
||||
} => {
|
||||
// Only fan-out if the booster is a local user. Remote boosts must not be re-broadcast.
|
||||
let booster = match self.users.find_by_id(user_id).await? {
|
||||
Some(u) if u.local => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let _ = booster;
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) => t,
|
||||
None => return Ok(()),
|
||||
@@ -167,6 +196,7 @@ impl FederationEventService {
|
||||
note.sensitive,
|
||||
note.content_warning,
|
||||
"public",
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -220,6 +250,60 @@ impl FederationEventService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
DomainEvent::LikeAdded {
|
||||
like_id: _,
|
||||
user_id,
|
||||
thought_id,
|
||||
} => {
|
||||
// Only federate: local liker + remote thought (has ap_id) + author has inbox.
|
||||
let liker = match self.users.find_by_id(user_id).await? {
|
||||
Some(u) if u.local => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let _ = liker;
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) if t.ap_id.is_some() => t,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let author = match self.users.find_by_id(&thought.user_id).await? {
|
||||
Some(u) if u.inbox_url.is_some() => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let object_ap_id = thought.ap_id.unwrap();
|
||||
let inbox_url = author.inbox_url.unwrap();
|
||||
self.ap
|
||||
.broadcast_like(user_id, &object_ap_id, &inbox_url)
|
||||
.await
|
||||
}
|
||||
|
||||
DomainEvent::LikeRemoved {
|
||||
user_id,
|
||||
thought_id,
|
||||
} => {
|
||||
let liker = match self.users.find_by_id(user_id).await? {
|
||||
Some(u) if u.local => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let _ = liker;
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) if t.ap_id.is_some() => t,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let author = match self.users.find_by_id(&thought.user_id).await? {
|
||||
Some(u) if u.inbox_url.is_some() => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let object_ap_id = thought.ap_id.unwrap();
|
||||
let inbox_url = author.inbox_url.unwrap();
|
||||
self.ap
|
||||
.broadcast_undo_like(user_id, &object_ap_id, &inbox_url)
|
||||
.await
|
||||
}
|
||||
|
||||
DomainEvent::ProfileUpdated { user_id } => {
|
||||
self.ap.broadcast_actor_update(user_id).await
|
||||
}
|
||||
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
@@ -249,6 +333,9 @@ mod tests {
|
||||
updated: Mutex<Vec<ThoughtId>>,
|
||||
announced: Mutex<Vec<String>>,
|
||||
undo_announced: Mutex<Vec<String>>,
|
||||
liked: Mutex<Vec<String>>,
|
||||
undo_liked: Mutex<Vec<String>>,
|
||||
actor_updated: Mutex<Vec<UserId>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -287,6 +374,31 @@ mod tests {
|
||||
self.undo_announced.lock().unwrap().push(ap_id.to_string());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn broadcast_like(
|
||||
&self,
|
||||
_: &UserId,
|
||||
ap_id: &str,
|
||||
_: &str,
|
||||
) -> Result<(), DomainError> {
|
||||
self.liked.lock().unwrap().push(ap_id.to_string());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn broadcast_undo_like(
|
||||
&self,
|
||||
_: &UserId,
|
||||
ap_id: &str,
|
||||
_: &str,
|
||||
) -> Result<(), DomainError> {
|
||||
self.undo_liked.lock().unwrap().push(ap_id.to_string());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn broadcast_actor_update(&self, user_id: &UserId) -> Result<(), DomainError> {
|
||||
self.actor_updated.lock().unwrap().push(user_id.clone());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn alice() -> User {
|
||||
@@ -414,6 +526,7 @@ mod tests {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = local_thought(alice.id.clone()); // ap_id = None
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
@@ -441,6 +554,7 @@ mod tests {
|
||||
let mut thought = local_thought(alice.id.clone());
|
||||
thought.local = false;
|
||||
thought.ap_id = Some("https://mastodon.social/users/bob/statuses/123".into());
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
@@ -671,4 +785,103 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn like_added_local_user_remote_thought_broadcasts_like() {
|
||||
let store = TestStore::default();
|
||||
|
||||
let mut author = User::new_local(
|
||||
UserId::new(),
|
||||
Username::new("remote_author").unwrap(),
|
||||
Email::new("r@remote.example").unwrap(),
|
||||
PasswordHash("h".into()),
|
||||
);
|
||||
author.local = false;
|
||||
author.inbox_url = Some("https://mastodon.social/users/author/inbox".into());
|
||||
|
||||
let mut thought = local_thought(author.id.clone());
|
||||
thought.ap_id = Some("https://mastodon.social/posts/123".into());
|
||||
|
||||
let liker = alice();
|
||||
|
||||
store.users.lock().unwrap().push(author.clone());
|
||||
store.users.lock().unwrap().push(liker.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::LikeAdded {
|
||||
like_id: LikeId::new(),
|
||||
user_id: liker.id,
|
||||
thought_id: thought.id,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(spy.liked.lock().unwrap().len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn like_added_remote_user_skips_broadcast() {
|
||||
let store = TestStore::default();
|
||||
|
||||
let author = alice();
|
||||
let thought = local_thought(author.id.clone()); // local thought — no ap_id
|
||||
|
||||
let mut remote_liker = User::new_local(
|
||||
UserId::new(),
|
||||
Username::new("bob").unwrap(),
|
||||
Email::new("bob@remote").unwrap(),
|
||||
PasswordHash("h".into()),
|
||||
);
|
||||
remote_liker.local = false;
|
||||
|
||||
store.users.lock().unwrap().push(author);
|
||||
store.users.lock().unwrap().push(remote_liker.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::LikeAdded {
|
||||
like_id: LikeId::new(),
|
||||
user_id: remote_liker.id,
|
||||
thought_id: thought.id,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(spy.liked.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn boost_added_remote_user_skips_broadcast() {
|
||||
let store = TestStore::default();
|
||||
|
||||
let author = alice();
|
||||
let thought = local_thought(author.id.clone());
|
||||
|
||||
let mut remote_booster = User::new_local(
|
||||
UserId::new(),
|
||||
Username::new("bob").unwrap(),
|
||||
Email::new("bob@remote").unwrap(),
|
||||
PasswordHash("h".into()),
|
||||
);
|
||||
remote_booster.local = false;
|
||||
|
||||
store.users.lock().unwrap().push(author);
|
||||
store.users.lock().unwrap().push(remote_booster.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::BoostAdded {
|
||||
boost_id: BoostId::new(),
|
||||
user_id: remote_booster.id,
|
||||
thought_id: thought.id,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(spy.announced.lock().unwrap().is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,6 +112,23 @@ impl NotificationEventService {
|
||||
})
|
||||
.await
|
||||
}
|
||||
DomainEvent::MentionReceived {
|
||||
thought_id,
|
||||
mentioned_user_id,
|
||||
author_user_id,
|
||||
} => {
|
||||
self.notifications
|
||||
.save(&Notification {
|
||||
id: NotificationId::new(),
|
||||
user_id: mentioned_user_id.clone(),
|
||||
notification_type: NotificationType::Mention,
|
||||
from_user_id: Some(author_user_id.clone()),
|
||||
thought_id: Some(thought_id.clone()),
|
||||
read: false,
|
||||
created_at: Utc::now(),
|
||||
})
|
||||
.await
|
||||
}
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
models::{top_friend::TopFriend, user::User},
|
||||
ports::{TopFriendRepository, UserRepository},
|
||||
ports::{EventPublisher, TopFriendRepository, UserRepository},
|
||||
value_objects::{UserId, Username},
|
||||
};
|
||||
|
||||
@@ -38,8 +39,10 @@ pub async fn get_user_by_id_or_username(
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn update_profile(
|
||||
users: &dyn UserRepository,
|
||||
events: &dyn EventPublisher,
|
||||
user_id: &UserId,
|
||||
display_name: Option<String>,
|
||||
bio: Option<String>,
|
||||
@@ -56,6 +59,11 @@ pub async fn update_profile(
|
||||
header_url,
|
||||
custom_css,
|
||||
)
|
||||
.await?;
|
||||
events
|
||||
.publish(&DomainEvent::ProfileUpdated {
|
||||
user_id: user_id.clone(),
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
|
||||
@@ -72,6 +72,7 @@ pub async fn build(cfg: &Config) -> Infrastructure {
|
||||
Arc::new(ThoughtsObjectHandler::new(
|
||||
Arc::new(PgActivityPubRepository::new(pool.clone())),
|
||||
&cfg.base_url,
|
||||
Some(event_publisher.clone()),
|
||||
)),
|
||||
cfg.base_url.clone(),
|
||||
cfg.allow_registration,
|
||||
|
||||
@@ -1,12 +1,6 @@
|
||||
mod config;
|
||||
mod factory;
|
||||
|
||||
use activitypub_base::{
|
||||
inbox::inbox_handler,
|
||||
nodeinfo::{nodeinfo_handler, nodeinfo_well_known_handler},
|
||||
outbox::outbox_handler,
|
||||
webfinger::webfinger_handler,
|
||||
};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tower_http::cors::{AllowOrigin, CorsLayer};
|
||||
@@ -38,28 +32,8 @@ async fn main() {
|
||||
.allow_headers(tower_http::cors::Any)
|
||||
};
|
||||
|
||||
let ap_router = axum::Router::new()
|
||||
.route(
|
||||
"/.well-known/webfinger",
|
||||
axum::routing::get(webfinger_handler),
|
||||
)
|
||||
.route(
|
||||
"/.well-known/nodeinfo",
|
||||
axum::routing::get(nodeinfo_well_known_handler),
|
||||
)
|
||||
.route("/nodeinfo/2.0", axum::routing::get(nodeinfo_handler))
|
||||
.route(
|
||||
"/users/{username}/inbox",
|
||||
axum::routing::post(inbox_handler),
|
||||
)
|
||||
.route(
|
||||
"/users/{username}/outbox",
|
||||
axum::routing::get(outbox_handler),
|
||||
)
|
||||
.layer(infra.ap_service.federation_config().middleware());
|
||||
|
||||
let base = presentation::routes::router()
|
||||
.merge(ap_router)
|
||||
.merge(infra.ap_service.router::<presentation::state::AppState>())
|
||||
.with_state(infra.state)
|
||||
.layer(cors);
|
||||
|
||||
|
||||
@@ -60,6 +60,9 @@ pub enum DomainEvent {
|
||||
UserRegistered {
|
||||
user_id: UserId,
|
||||
},
|
||||
ProfileUpdated {
|
||||
user_id: UserId,
|
||||
},
|
||||
FetchRemoteActorPosts {
|
||||
actor_ap_url: String,
|
||||
outbox_url: String,
|
||||
@@ -70,6 +73,11 @@ pub enum DomainEvent {
|
||||
connection_type: String,
|
||||
page: u32,
|
||||
},
|
||||
MentionReceived {
|
||||
thought_id: ThoughtId,
|
||||
mentioned_user_id: UserId,
|
||||
author_user_id: UserId,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct EventEnvelope {
|
||||
|
||||
@@ -391,6 +391,7 @@ pub trait ActivityPubRepository: Send + Sync {
|
||||
sensitive: bool,
|
||||
content_warning: Option<String>,
|
||||
visibility: &str,
|
||||
in_reply_to: Option<&url::Url>,
|
||||
) -> Result<(), DomainError>;
|
||||
|
||||
/// Apply an Update to a previously accepted remote Note.
|
||||
@@ -453,4 +454,24 @@ pub trait OutboundFederationPort: Send + Sync {
|
||||
booster_user_id: &UserId,
|
||||
object_ap_id: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
|
||||
/// Send a Like activity to a remote thought author's inbox.
|
||||
/// Only called when a LOCAL user likes a REMOTE thought (one with an ap_id).
|
||||
async fn broadcast_like(
|
||||
&self,
|
||||
liker_user_id: &UserId,
|
||||
object_ap_id: &str,
|
||||
author_inbox_url: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
|
||||
/// Send Undo(Like) to a remote thought author's inbox.
|
||||
async fn broadcast_undo_like(
|
||||
&self,
|
||||
liker_user_id: &UserId,
|
||||
object_ap_id: &str,
|
||||
author_inbox_url: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
|
||||
/// Fan out an Update(Actor) to all accepted followers after a profile change.
|
||||
async fn broadcast_actor_update(&self, user_id: &UserId) -> Result<(), DomainError>;
|
||||
}
|
||||
|
||||
@@ -847,6 +847,7 @@ impl ActivityPubRepository for TestStore {
|
||||
_sensitive: bool,
|
||||
_content_warning: Option<String>,
|
||||
_visibility: &str,
|
||||
_in_reply_to: Option<&url::Url>,
|
||||
) -> Result<(), DomainError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -37,6 +37,7 @@ pub fn to_thought_response(e: &domain::models::feed::FeedEntry) -> ThoughtRespon
|
||||
content: e.thought.content.as_str().to_string(),
|
||||
author: to_user_response(&e.author),
|
||||
in_reply_to_id: e.thought.in_reply_to_id.as_ref().map(|id| id.as_uuid()),
|
||||
in_reply_to_url: e.thought.in_reply_to_url.clone(),
|
||||
visibility: visibility_as_str(&e.thought.visibility).to_string(),
|
||||
content_warning: e.thought.content_warning.clone(),
|
||||
sensitive: e.thought.sensitive,
|
||||
|
||||
@@ -72,6 +72,7 @@ pub async fn patch_profile(
|
||||
) -> Result<Json<UserResponse>, ApiError> {
|
||||
update_profile(
|
||||
&*s.users,
|
||||
&*s.events,
|
||||
&uid,
|
||||
body.display_name,
|
||||
body.bio,
|
||||
|
||||
@@ -46,6 +46,7 @@ pub async fn build(
|
||||
Arc::new(ThoughtsObjectHandler::new(
|
||||
Arc::new(PgActivityPubRepository::new(pool.clone())),
|
||||
base_url,
|
||||
None,
|
||||
)),
|
||||
base_url.to_string(),
|
||||
false,
|
||||
|
||||
779
docs/superpowers/plans/2026-05-15-ap-likes-boosts.md
Normal file
779
docs/superpowers/plans/2026-05-15-ap-likes-boosts.md
Normal file
@@ -0,0 +1,779 @@
|
||||
# ActivityPub Likes & Boost Notifications Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** Wire local likes/unlikes to outbound Like/Undo(Like) AP activities, and handle inbound Like and Announce activities so Mastodon interactions create notifications.
|
||||
|
||||
**Architecture:** Four layers of change — domain port extension, ActivityPubService implementation, application-layer federation event routing, and inbox activity handler registration. Inbound likes/boosts publish domain events (LikeAdded/BoostAdded) so the existing notification service picks them up without duplication. A locality guard in `federation_event.rs` prevents re-broadcasting remote boosts.
|
||||
|
||||
**Tech Stack:** Rust, activitypub_federation crate, async-trait, serde, domain ports.
|
||||
|
||||
---
|
||||
|
||||
## Files
|
||||
|
||||
| Action | File | Purpose |
|
||||
|--------|------|---------|
|
||||
| Modify | `crates/domain/src/ports.rs` | Add `broadcast_like`, `broadcast_undo_like` to `OutboundFederationPort` |
|
||||
| Modify | `crates/application/src/services/federation_event.rs` | Add `liked`/`undo_liked` to SpyPort; add `LikeAdded`/`LikeRemoved` arms; add locality guard to `BoostAdded` |
|
||||
| Modify | `crates/adapters/activitypub-base/src/activities.rs` | Add `LikeActivity` struct; `LikeActivity::receive`; update `AnnounceActivity::receive`; register in `InboxActivities` |
|
||||
| Modify | `crates/adapters/activitypub-base/src/content.rs` | Add `on_like`, `on_announce_received` to `ApObjectHandler` trait |
|
||||
| Modify | `crates/adapters/activitypub-base/src/service.rs` | Add `broadcast_like_to_inbox`, `broadcast_undo_like_to_inbox`; implement port methods |
|
||||
| Modify | `crates/adapters/activitypub/src/handler.rs` | Implement `on_like`, `on_announce_received` in `ThoughtsObjectHandler` |
|
||||
|
||||
---
|
||||
|
||||
## Task 1: Extend OutboundFederationPort + SpyPort
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/domain/src/ports.rs`
|
||||
- Modify: `crates/application/src/services/federation_event.rs`
|
||||
|
||||
- [ ] **Step 1: Add two methods to `OutboundFederationPort` in `crates/domain/src/ports.rs`**
|
||||
|
||||
Find `OutboundFederationPort` (around line 417). Add after `broadcast_undo_announce`:
|
||||
|
||||
```rust
|
||||
/// Send a Like activity to a remote thought author's inbox.
|
||||
/// Only called when a LOCAL user likes a REMOTE thought (one with an ap_id).
|
||||
async fn broadcast_like(
|
||||
&self,
|
||||
liker_user_id: &UserId,
|
||||
object_ap_id: &str,
|
||||
author_inbox_url: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
|
||||
/// Send Undo(Like) to a remote thought author's inbox.
|
||||
async fn broadcast_undo_like(
|
||||
&self,
|
||||
liker_user_id: &UserId,
|
||||
object_ap_id: &str,
|
||||
author_inbox_url: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add stubs to `SpyPort` in `crates/application/src/services/federation_event.rs`**
|
||||
|
||||
Find `SpyPort` struct (around line 245). Add two fields:
|
||||
```rust
|
||||
liked: Mutex<Vec<String>>,
|
||||
undo_liked: Mutex<Vec<String>>,
|
||||
```
|
||||
|
||||
Find `impl OutboundFederationPort for SpyPort`. Add after `broadcast_undo_announce`:
|
||||
```rust
|
||||
async fn broadcast_like(
|
||||
&self,
|
||||
_: &UserId,
|
||||
ap_id: &str,
|
||||
_: &str,
|
||||
) -> Result<(), DomainError> {
|
||||
self.liked.lock().unwrap().push(ap_id.to_string());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn broadcast_undo_like(
|
||||
&self,
|
||||
_: &UserId,
|
||||
ap_id: &str,
|
||||
_: &str,
|
||||
) -> Result<(), DomainError> {
|
||||
self.undo_liked.lock().unwrap().push(ap_id.to_string());
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Verify compilation**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build -p domain -p application 2>&1 | grep "^error" | head -10
|
||||
```
|
||||
Expected: no errors (activitypub-base will fail until Task 3 — that's fine, build only those two crates).
|
||||
|
||||
- [ ] **Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/domain/src/ports.rs crates/application/src/services/federation_event.rs
|
||||
git commit -m "feat(domain): add broadcast_like/broadcast_undo_like to OutboundFederationPort"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 2: LikeActivity struct + ApObjectHandler trait methods
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/activitypub-base/src/activities.rs`
|
||||
- Modify: `crates/adapters/activitypub-base/src/content.rs`
|
||||
|
||||
### Part A — LikeActivity struct (activities.rs)
|
||||
|
||||
- [ ] **Step 1: Add `LikeType` and `LikeActivity` to `crates/adapters/activitypub-base/src/activities.rs`**
|
||||
|
||||
Find where `AnnounceType` is defined (around line 13). Add right after:
|
||||
|
||||
```rust
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[serde(rename = "Like")]
|
||||
pub struct LikeType;
|
||||
|
||||
impl Default for LikeType {
|
||||
fn default() -> Self {
|
||||
Self
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Find where `AnnounceActivity` struct is defined (around line 461). Add a `LikeActivity` struct after it:
|
||||
|
||||
```rust
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct LikeActivity {
|
||||
pub id: Url,
|
||||
#[serde(rename = "type")]
|
||||
pub kind: LikeType,
|
||||
pub actor: ObjectId<DbActor>,
|
||||
pub object: Url,
|
||||
}
|
||||
```
|
||||
|
||||
### Part B — ApObjectHandler trait (content.rs)
|
||||
|
||||
- [ ] **Step 2: Add `on_like` and `on_announce_received` to `ApObjectHandler` in `crates/adapters/activitypub-base/src/content.rs`**
|
||||
|
||||
Find the `ApObjectHandler` trait. Add after `on_actor_removed`:
|
||||
|
||||
```rust
|
||||
/// Called when a remote actor likes a local thought.
|
||||
/// `object_url` is the AP URL of the liked note (e.g. `{base}/thoughts/{uuid}`).
|
||||
/// `actor_url` is the AP URL of the remote actor who sent the Like.
|
||||
async fn on_like(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
|
||||
|
||||
/// Called when a remote actor boosts (Announce) a local thought.
|
||||
/// `object_url` is the AP URL of the announced note.
|
||||
/// `actor_url` is the AP URL of the remote actor who sent the Announce.
|
||||
async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Verify compilation of activitypub-base**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build -p activitypub-base 2>&1 | grep "^error" | head -10
|
||||
```
|
||||
Expected: errors that `ThoughtsObjectHandler` in `activitypub` doesn't implement the new methods — that's fine. `activitypub-base` itself should compile.
|
||||
|
||||
- [ ] **Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/activitypub-base/src/activities.rs \
|
||||
crates/adapters/activitypub-base/src/content.rs
|
||||
git commit -m "feat(activitypub-base): LikeActivity struct + on_like/on_announce_received trait methods"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 3: Implement broadcast_like + LikeActivity::receive + AnnounceActivity update
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/activitypub-base/src/service.rs`
|
||||
- Modify: `crates/adapters/activitypub-base/src/activities.rs`
|
||||
|
||||
### Part A — ActivityPubService implementation (service.rs)
|
||||
|
||||
- [ ] **Step 1: Add `broadcast_like_to_inbox` private method to `impl ActivityPubService`**
|
||||
|
||||
Add this private method inside `impl ActivityPubService` (not inside the port impl block):
|
||||
|
||||
```rust
|
||||
pub async fn broadcast_like_to_inbox(
|
||||
&self,
|
||||
liker_user_id: uuid::Uuid,
|
||||
object_ap_id: url::Url,
|
||||
author_inbox_url: url::Url,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let local_actor = get_local_actor(liker_user_id, &data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
// Deterministic ID so Undo(Like) can reference the same activity.
|
||||
let like_id = url::Url::parse(&format!(
|
||||
"{}/activities/like/{}",
|
||||
self.base_url,
|
||||
uuid::Uuid::new_v5(
|
||||
&uuid::Uuid::NAMESPACE_URL,
|
||||
format!("{}/{}", liker_user_id, object_ap_id).as_bytes(),
|
||||
)
|
||||
))?;
|
||||
|
||||
let like = crate::activities::LikeActivity {
|
||||
id: like_id,
|
||||
kind: Default::default(),
|
||||
actor: activitypub_federation::fetch::object_id::ObjectId::from(
|
||||
local_actor.ap_id.clone(),
|
||||
),
|
||||
object: object_ap_id,
|
||||
};
|
||||
|
||||
let sends = activitypub_federation::activity_sending::SendActivityTask::prepare(
|
||||
&activitypub_federation::protocol::context::WithContext::new_default(like),
|
||||
&local_actor,
|
||||
vec![author_inbox_url],
|
||||
&data,
|
||||
)
|
||||
.await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(count = failures.len(), "some Like deliveries failed permanently");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add `broadcast_undo_like_to_inbox` private method**
|
||||
|
||||
Add directly after `broadcast_like_to_inbox`:
|
||||
|
||||
```rust
|
||||
pub async fn broadcast_undo_like_to_inbox(
|
||||
&self,
|
||||
liker_user_id: uuid::Uuid,
|
||||
object_ap_id: url::Url,
|
||||
author_inbox_url: url::Url,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let local_actor = get_local_actor(liker_user_id, &data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
// Reconstruct the same deterministic like ID used when the like was sent.
|
||||
let like_id = url::Url::parse(&format!(
|
||||
"{}/activities/like/{}",
|
||||
self.base_url,
|
||||
uuid::Uuid::new_v5(
|
||||
&uuid::Uuid::NAMESPACE_URL,
|
||||
format!("{}/{}", liker_user_id, object_ap_id).as_bytes(),
|
||||
)
|
||||
))?;
|
||||
|
||||
let undo_id =
|
||||
crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let undo = crate::activities::UndoActivity {
|
||||
id: undo_id,
|
||||
kind: Default::default(),
|
||||
actor: activitypub_federation::fetch::object_id::ObjectId::from(
|
||||
local_actor.ap_id.clone(),
|
||||
),
|
||||
object: serde_json::json!({
|
||||
"type": "Like",
|
||||
"id": like_id.to_string(),
|
||||
"actor": local_actor.ap_id.to_string(),
|
||||
"object": object_ap_id.to_string(),
|
||||
}),
|
||||
};
|
||||
|
||||
let sends = activitypub_federation::activity_sending::SendActivityTask::prepare(
|
||||
&activitypub_federation::protocol::context::WithContext::new_default(undo),
|
||||
&local_actor,
|
||||
vec![author_inbox_url],
|
||||
&data,
|
||||
)
|
||||
.await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(count = failures.len(), "some Undo(Like) deliveries failed permanently");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Implement `broadcast_like` and `broadcast_undo_like` in `impl domain::ports::OutboundFederationPort for ActivityPubService`**
|
||||
|
||||
Find the existing `broadcast_undo_announce` impl. Add directly after it:
|
||||
|
||||
```rust
|
||||
async fn broadcast_like(
|
||||
&self,
|
||||
liker_user_id: &domain::value_objects::UserId,
|
||||
object_ap_id: &str,
|
||||
author_inbox_url: &str,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
let object = url::Url::parse(object_ap_id)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
let inbox = url::Url::parse(author_inbox_url)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
self.broadcast_like_to_inbox(liker_user_id.as_uuid(), object, inbox)
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||
}
|
||||
|
||||
async fn broadcast_undo_like(
|
||||
&self,
|
||||
liker_user_id: &domain::value_objects::UserId,
|
||||
object_ap_id: &str,
|
||||
author_inbox_url: &str,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
let object = url::Url::parse(object_ap_id)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
let inbox = url::Url::parse(author_inbox_url)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
self.broadcast_undo_like_to_inbox(liker_user_id.as_uuid(), object, inbox)
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||
}
|
||||
```
|
||||
|
||||
### Part B — LikeActivity::receive + AnnounceActivity update (activities.rs)
|
||||
|
||||
- [ ] **Step 4: Implement `Activity` for `LikeActivity` in `crates/adapters/activitypub-base/src/activities.rs`**
|
||||
|
||||
Add after the `LikeActivity` struct definition:
|
||||
|
||||
```rust
|
||||
#[async_trait]
|
||||
impl Activity for LikeActivity {
|
||||
type DataType = FederationData;
|
||||
type Error = crate::error::Error;
|
||||
|
||||
fn actor(&self) -> &Url {
|
||||
self.actor.inner()
|
||||
}
|
||||
|
||||
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
let domain = self.actor().host_str().unwrap_or("");
|
||||
if data.federation_repo.is_domain_blocked(domain).await? {
|
||||
tracing::info!(actor = %self.actor(), "ignoring Like from blocked domain");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Only process if the liked object is on our instance.
|
||||
if self.object.host_str().unwrap_or("") != data.domain {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
data.object_handler
|
||||
.on_like(&self.object, self.actor.inner())
|
||||
.await
|
||||
.map_err(|e| crate::error::Error::Other(e.to_string()))?;
|
||||
|
||||
tracing::info!(actor = %self.actor.inner(), object = %self.object, "received like");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Update `AnnounceActivity::receive` to call `on_announce_received`**
|
||||
|
||||
Find `AnnounceActivity::receive`. After the `add_announce` call and before the `tracing::info!`, add:
|
||||
|
||||
```rust
|
||||
data.object_handler
|
||||
.on_announce_received(&self.object, self.actor.inner())
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
tracing::warn!(error = %e, "failed to process announce notification");
|
||||
});
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Register `LikeActivity` in `InboxActivities` enum**
|
||||
|
||||
Find the `InboxActivities` enum. Add:
|
||||
|
||||
```rust
|
||||
#[serde(rename = "Like")]
|
||||
Like(LikeActivity),
|
||||
```
|
||||
|
||||
- [ ] **Step 7: Verify activitypub-base compiles**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build -p activitypub-base 2>&1 | grep "^error" | head -10
|
||||
```
|
||||
Expected: no errors from activitypub-base. (`activitypub` crate will fail until Task 4.)
|
||||
|
||||
- [ ] **Step 8: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/activitypub-base/src/service.rs \
|
||||
crates/adapters/activitypub-base/src/activities.rs
|
||||
git commit -m "feat(activitypub-base): broadcast_like/undo_like + LikeActivity inbox handler"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 4: Implement on_like and on_announce_received in ThoughtsObjectHandler
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/activitypub/src/handler.rs`
|
||||
|
||||
`ThoughtsObjectHandler` has `ap_repo: Arc<dyn ActivityPubRepository>` and `event_publisher: Option<Arc<dyn EventPublisher>>`. These are all we need.
|
||||
|
||||
Pattern for both methods:
|
||||
1. Parse the thought UUID out of the object URL path (`/thoughts/{uuid}`)
|
||||
2. Find the remote actor's local user ID via `ap_repo.find_remote_actor_id(actor_url)`
|
||||
3. Publish the appropriate domain event — the notification service already handles `LikeAdded` and `BoostAdded`
|
||||
|
||||
- [ ] **Step 1: Read `crates/adapters/activitypub/src/handler.rs` to understand the struct and existing impls**
|
||||
|
||||
Look for `struct ThoughtsObjectHandler` and `impl ApObjectHandler for ThoughtsObjectHandler`.
|
||||
|
||||
- [ ] **Step 2: Implement `on_like` in `impl ApObjectHandler for ThoughtsObjectHandler`**
|
||||
|
||||
Add:
|
||||
|
||||
```rust
|
||||
async fn on_like(&self, object_url: &url::Url, actor_url: &url::Url) -> anyhow::Result<()> {
|
||||
// Parse thought UUID from path like /thoughts/{uuid}
|
||||
let thought_uuid = object_url
|
||||
.path()
|
||||
.strip_prefix("/thoughts/")
|
||||
.and_then(|s| s.split('/').next())
|
||||
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||||
|
||||
let thought_uuid = match thought_uuid {
|
||||
Some(u) => u,
|
||||
None => {
|
||||
tracing::debug!(object = %object_url, "on_like: not a local thought URL, skipping");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
// Resolve the remote actor to a local user ID.
|
||||
let actor_user_id = self
|
||||
.ap_repo
|
||||
.find_remote_actor_id(actor_url)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let actor_user_id = match actor_user_id {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
tracing::debug!(actor = %actor_url, "on_like: remote actor not interned, skipping notification");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(ep) = &self.event_publisher {
|
||||
let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid);
|
||||
let like_id = domain::value_objects::LikeId::new();
|
||||
ep.publish(&domain::events::DomainEvent::LikeAdded {
|
||||
like_id,
|
||||
user_id: actor_user_id,
|
||||
thought_id,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Implement `on_announce_received`**
|
||||
|
||||
Add directly after `on_like`:
|
||||
|
||||
```rust
|
||||
async fn on_announce_received(
|
||||
&self,
|
||||
object_url: &url::Url,
|
||||
actor_url: &url::Url,
|
||||
) -> anyhow::Result<()> {
|
||||
// Parse thought UUID from path like /thoughts/{uuid}
|
||||
let thought_uuid = object_url
|
||||
.path()
|
||||
.strip_prefix("/thoughts/")
|
||||
.and_then(|s| s.split('/').next())
|
||||
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||||
|
||||
let thought_uuid = match thought_uuid {
|
||||
Some(u) => u,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
let actor_user_id = self
|
||||
.ap_repo
|
||||
.find_remote_actor_id(actor_url)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let actor_user_id = match actor_user_id {
|
||||
Some(id) => id,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
if let Some(ep) = &self.event_publisher {
|
||||
let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid);
|
||||
let boost_id = domain::value_objects::BoostId::new();
|
||||
ep.publish(&domain::events::DomainEvent::BoostAdded {
|
||||
boost_id,
|
||||
user_id: actor_user_id,
|
||||
thought_id,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Verify full workspace build**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -10
|
||||
```
|
||||
Expected: no errors.
|
||||
|
||||
- [ ] **Step 5: Run tests**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo test -p domain -p application 2>&1 | tail -5
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/activitypub/src/handler.rs
|
||||
git commit -m "feat(activitypub): implement on_like and on_announce_received in ThoughtsObjectHandler"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 5: federation_event.rs — LikeAdded/LikeRemoved arms + BoostAdded locality guard
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/application/src/services/federation_event.rs`
|
||||
|
||||
The federation service must:
|
||||
- **BoostAdded**: add a locality check so remote boosts (published by Task 4) don't get re-broadcast
|
||||
- **LikeAdded**: fan-out only when a LOCAL user likes a REMOTE thought (has ap_id)
|
||||
- **LikeRemoved**: Undo(Like) when a LOCAL user unlikes a REMOTE thought
|
||||
|
||||
- [ ] **Step 1: Write tests for the new arms**
|
||||
|
||||
Find the `#[cfg(test)]` block in `crates/application/src/services/federation_event.rs`. Add:
|
||||
|
||||
```rust
|
||||
#[tokio::test]
|
||||
async fn like_added_local_user_remote_thought_broadcasts_like() {
|
||||
let store = TestStore::default();
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
|
||||
// Set up a remote thought with ap_id
|
||||
let author = {
|
||||
let mut u = test_user("remote_author");
|
||||
u.local = false;
|
||||
u.inbox_url = Some("https://mastodon.social/users/author/inbox".into());
|
||||
u
|
||||
};
|
||||
let thought = {
|
||||
let mut t = test_thought(author.id.clone());
|
||||
t.ap_id = Some("https://mastodon.social/posts/123".into());
|
||||
t.in_reply_to_url = None;
|
||||
t
|
||||
};
|
||||
let liker = test_user("alice"); // local user
|
||||
|
||||
store.users.lock().unwrap().push(author);
|
||||
store.users.lock().unwrap().push(liker.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let svc = test_service(store, spy.clone());
|
||||
svc.process(&DomainEvent::LikeAdded {
|
||||
like_id: LikeId::new(),
|
||||
user_id: liker.id,
|
||||
thought_id: thought.id,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(spy.liked.lock().unwrap().len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn like_added_remote_user_skips_broadcast() {
|
||||
let store = TestStore::default();
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
|
||||
let author = test_user("alice");
|
||||
let thought = test_thought(author.id.clone()); // local thought, no ap_id
|
||||
let remote_liker = {
|
||||
let mut u = test_user("bob");
|
||||
u.local = false;
|
||||
u
|
||||
};
|
||||
|
||||
store.users.lock().unwrap().push(author);
|
||||
store.users.lock().unwrap().push(remote_liker.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let svc = test_service(store, spy.clone());
|
||||
svc.process(&DomainEvent::LikeAdded {
|
||||
like_id: LikeId::new(),
|
||||
user_id: remote_liker.id,
|
||||
thought_id: thought.id,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(spy.liked.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn boost_added_remote_user_skips_broadcast() {
|
||||
let store = TestStore::default();
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
|
||||
let author = test_user("alice");
|
||||
let thought = test_thought(author.id.clone());
|
||||
let remote_booster = {
|
||||
let mut u = test_user("bob");
|
||||
u.local = false;
|
||||
u
|
||||
};
|
||||
|
||||
store.users.lock().unwrap().push(author);
|
||||
store.users.lock().unwrap().push(remote_booster.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let svc = test_service(store, spy.clone());
|
||||
svc.process(&DomainEvent::BoostAdded {
|
||||
boost_id: BoostId::new(),
|
||||
user_id: remote_booster.id,
|
||||
thought_id: thought.id,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(spy.announced.lock().unwrap().is_empty());
|
||||
}
|
||||
```
|
||||
|
||||
Note: these tests use `test_user`, `test_thought`, `test_service` helpers — read the existing tests in the same file to find these helpers and use the same pattern. If `User.local` field setters don't exist, set the field directly (it's `pub`).
|
||||
|
||||
- [ ] **Step 2: Run tests to confirm they fail**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo test -p application federation_event 2>&1 | grep "FAILED\|error" | head -10
|
||||
```
|
||||
Expected: tests fail (LikeAdded arm not handled, BoostAdded has no locality guard).
|
||||
|
||||
- [ ] **Step 3: Add locality guard to existing `BoostAdded` arm**
|
||||
|
||||
Find the `DomainEvent::BoostAdded` match arm. Add a locality check at the top:
|
||||
|
||||
```rust
|
||||
DomainEvent::BoostAdded {
|
||||
boost_id: _,
|
||||
user_id,
|
||||
thought_id,
|
||||
} => {
|
||||
// Only fan-out if the booster is a local user. Remote boosts (from inbound
|
||||
// Announce activities) must not be re-broadcast to avoid loops.
|
||||
let booster = match self.users.find_by_id(user_id).await? {
|
||||
Some(u) if u.local => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let _ = booster; // suppress unused warning — kept for the locality check
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) => t,
|
||||
None => return Ok(()),
|
||||
};
|
||||
let object_ap_id = self.object_ap_id(&thought, thought_id);
|
||||
self.ap.broadcast_announce(user_id, &object_ap_id).await
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Add `LikeAdded` arm**
|
||||
|
||||
Find the `_ => Ok(())` catch-all at the end of the `match event` block. Add before it:
|
||||
|
||||
```rust
|
||||
DomainEvent::LikeAdded {
|
||||
like_id: _,
|
||||
user_id,
|
||||
thought_id,
|
||||
} => {
|
||||
// Only federate: local liker + remote thought (has ap_id + author has inbox).
|
||||
let liker = match self.users.find_by_id(user_id).await? {
|
||||
Some(u) if u.local => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let _ = liker;
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) if t.ap_id.is_some() => t,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let author = match self.users.find_by_id(&thought.user_id).await? {
|
||||
Some(u) if u.inbox_url.is_some() => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let object_ap_id = thought.ap_id.unwrap();
|
||||
let inbox_url = author.inbox_url.unwrap();
|
||||
self.ap
|
||||
.broadcast_like(user_id, &object_ap_id, &inbox_url)
|
||||
.await
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Add `LikeRemoved` arm**
|
||||
|
||||
Add directly after `LikeAdded`:
|
||||
|
||||
```rust
|
||||
DomainEvent::LikeRemoved {
|
||||
user_id,
|
||||
thought_id,
|
||||
} => {
|
||||
let liker = match self.users.find_by_id(user_id).await? {
|
||||
Some(u) if u.local => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let _ = liker;
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) if t.ap_id.is_some() => t,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let author = match self.users.find_by_id(&thought.user_id).await? {
|
||||
Some(u) if u.inbox_url.is_some() => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let object_ap_id = thought.ap_id.unwrap();
|
||||
let inbox_url = author.inbox_url.unwrap();
|
||||
self.ap
|
||||
.broadcast_undo_like(user_id, &object_ap_id, &inbox_url)
|
||||
.await
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Run tests — all should pass**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo test -p application federation_event 2>&1 | tail -5
|
||||
```
|
||||
Expected: all pass.
|
||||
|
||||
- [ ] **Step 7: Full build + all unit tests**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -5
|
||||
cd /mnt/drive/dev/thoughts && cargo test -p domain -p application 2>&1 | tail -5
|
||||
```
|
||||
|
||||
- [ ] **Step 8: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/application/src/services/federation_event.rs
|
||||
git commit -m "feat(application): federate local likes + locality guard prevents remote boost re-broadcast"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Notes
|
||||
|
||||
- **No loop risk**: The `BoostAdded` locality guard (`u.local`) ensures remote boosts published by `on_announce_received` skip federation fan-out. Same guard applies to `LikeAdded`.
|
||||
- **Existing notification service**: `LikeAdded` and `BoostAdded` events published from inbound activity handlers are picked up by `NotificationEventService` unchanged — it already creates notifications for these events.
|
||||
- **Deterministic activity IDs**: Like and Undo(Like) use `Uuid::new_v5(NAMESPACE_URL, "{user}/{object}")` so the Undo can reference the original Like ID without DB storage.
|
||||
- **Only remote thoughts get likes federated**: Local thoughts liked by local users generate no outbound activity (the like is already recorded locally).
|
||||
781
docs/superpowers/plans/2026-05-15-federation-gaps-2.md
Normal file
781
docs/superpowers/plans/2026-05-15-federation-gaps-2.md
Normal file
@@ -0,0 +1,781 @@
|
||||
# Federation Gaps — Round 2 Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** Fix seven federation gaps: HTML content format, hashtag federation, Undo(Like) inbound, Update(Actor) on profile change, @mention notifications, remote posts in home feed, and orphaned reply parent display.
|
||||
|
||||
**Architecture:** Backend changes span the AP adapter layer (activities.rs, service.rs, handler.rs), application layer (use cases, event service), and postgres adapter (feed.rs). Frontend changes are limited to api.ts and thought-card.tsx. All changes follow the existing hexagonal pattern — no business logic in presentation, domain events for cross-cutting concerns.
|
||||
|
||||
**Tech Stack:** Rust / axum / sqlx / activitypub_federation crate; Next.js 15 / TypeScript / Zod.
|
||||
|
||||
---
|
||||
|
||||
## Files Modified
|
||||
|
||||
| Task | File | Change |
|
||||
|------|------|--------|
|
||||
| 1 | `crates/adapters/activitypub-base/src/service.rs` | Wrap content in `<p>` tags with HTML escaping |
|
||||
| 2 | `crates/adapters/activitypub/src/note.rs` | Add `tag` field to ThoughtNote |
|
||||
| 2 | `crates/adapters/activitypub-base/src/service.rs` | Extract hashtags and add to Note JSON |
|
||||
| 3 | `crates/adapters/activitypub-base/src/activities.rs` | Add "Like" arm to UndoActivity::receive |
|
||||
| 4 | `crates/domain/src/ports.rs` | Add `broadcast_actor_update` to OutboundFederationPort |
|
||||
| 4 | `crates/domain/src/events.rs` | Add `ProfileUpdated` variant |
|
||||
| 4 | `crates/domain/src/testing.rs` | Add SpyPort stub for broadcast_actor_update |
|
||||
| 4 | `crates/application/src/use_cases/profile.rs` | Publish ProfileUpdated from update_profile |
|
||||
| 4 | `crates/application/src/services/federation_event.rs` | Handle ProfileUpdated → broadcast_actor_update |
|
||||
| 4 | `crates/adapters/activitypub-base/src/service.rs` | Implement broadcast_actor_update port method |
|
||||
| 5 | `crates/adapters/activitypub/src/note.rs` | Add `tag` deserialization field |
|
||||
| 5 | `crates/adapters/activitypub-base/src/content.rs` | Add `on_mention` to ApObjectHandler |
|
||||
| 5 | `crates/adapters/activitypub/src/handler.rs` | Parse Mention tags, implement on_mention |
|
||||
| 5 | `crates/domain/src/events.rs` | Add `MentionReceived` variant |
|
||||
| 5 | `crates/domain/src/testing.rs` | No-op on_mention in TestStore impl |
|
||||
| 5 | `crates/application/src/services/notification_event.rs` | Handle MentionReceived |
|
||||
| 6 | `crates/adapters/postgres/src/feed.rs` | Extend home_feed SQL to include federation_following |
|
||||
| 7 | `crates/api-types/src/responses.rs` | Add `in_reply_to_url` to ThoughtResponse |
|
||||
| 7 | `crates/presentation/src/handlers/feed.rs` | Map in_reply_to_url into response |
|
||||
| 7 | `thoughts-frontend/lib/api.ts` | Add replyToUrl to ThoughtSchema |
|
||||
| 7 | `thoughts-frontend/components/thought-card.tsx` | Show external reply link when replyToUrl set |
|
||||
|
||||
---
|
||||
|
||||
## Task 1: HTML content in outbound Notes
|
||||
|
||||
Mastodon and other AP servers expect HTML, not plain text. Wrap content in `<p>` tags and escape HTML entities. Multi-paragraph posts (newlines) get multiple `<p>` elements.
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/activitypub-base/src/service.rs` (function `thought_note_json`)
|
||||
|
||||
- [ ] **Step 1: Add a private HTML-escaping helper near the top of service.rs**
|
||||
|
||||
Read `crates/adapters/activitypub-base/src/service.rs`. Find `fn thought_note_json`. Add this private function just before it:
|
||||
|
||||
```rust
|
||||
fn content_to_html(text: &str) -> String {
|
||||
let escaped = text
|
||||
.replace('&', "&")
|
||||
.replace('<', "<")
|
||||
.replace('>', ">")
|
||||
.replace('"', """);
|
||||
let paragraphs: Vec<&str> = escaped.split('\n').filter(|s| !s.is_empty()).collect();
|
||||
if paragraphs.is_empty() {
|
||||
format!("<p>{}</p>", escaped)
|
||||
} else {
|
||||
paragraphs
|
||||
.iter()
|
||||
.map(|p| format!("<p>{}</p>", p))
|
||||
.collect::<Vec<_>>()
|
||||
.join("")
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Use `content_to_html` in `thought_note_json`**
|
||||
|
||||
In `thought_note_json`, find:
|
||||
```rust
|
||||
"content": thought.content.as_str(),
|
||||
```
|
||||
Replace with:
|
||||
```rust
|
||||
"content": content_to_html(thought.content.as_str()),
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Verify compilation**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build -p activitypub-base 2>&1 | grep "^error"
|
||||
```
|
||||
Expected: no errors.
|
||||
|
||||
- [ ] **Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/activitypub-base/src/service.rs
|
||||
git commit -m "fix(ap): wrap outbound Note content in HTML paragraph tags"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 2: Hashtag federation
|
||||
|
||||
Outbound Notes must include a `tag` array with Hashtag objects so Mastodon can index posts by hashtag. Extract `#word` patterns from content and add to the Note JSON.
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/activitypub-base/src/service.rs` (`thought_note_json`)
|
||||
|
||||
- [ ] **Step 1: Add a hashtag-extraction helper in service.rs**
|
||||
|
||||
Add this function near `content_to_html` (already added in Task 1):
|
||||
|
||||
```rust
|
||||
fn extract_hashtag_tags(content: &str, base_url: &str) -> Vec<serde_json::Value> {
|
||||
let mut seen = std::collections::HashSet::new();
|
||||
let mut tags = Vec::new();
|
||||
for word in content.split_whitespace() {
|
||||
let tag = word.trim_matches(|c: char| !c.is_alphanumeric() && c != '#');
|
||||
if let Some(name) = tag.strip_prefix('#') {
|
||||
if !name.is_empty() && seen.insert(name.to_lowercase()) {
|
||||
let lower = name.to_lowercase();
|
||||
tags.push(serde_json::json!({
|
||||
"type": "Hashtag",
|
||||
"name": format!("#{}", lower),
|
||||
"href": format!("{}/tags/{}", base_url, lower),
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
tags
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add hashtag tags to the Note JSON in `thought_note_json`**
|
||||
|
||||
In `thought_note_json`, after the closing `}` of `let mut note = serde_json::json!({...})`, add:
|
||||
|
||||
```rust
|
||||
let hashtag_tags = extract_hashtag_tags(thought.content.as_str(), base_url);
|
||||
if !hashtag_tags.is_empty() {
|
||||
note["tag"] = serde_json::json!(hashtag_tags);
|
||||
}
|
||||
```
|
||||
|
||||
Note: `base_url` is already a parameter of `thought_note_json(&self, thought, local_actor, base_url)` — use it directly.
|
||||
|
||||
- [ ] **Step 3: Verify compilation**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build -p activitypub-base 2>&1 | grep "^error"
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/activitypub-base/src/service.rs
|
||||
git commit -m "feat(ap): add hashtag tag array to outbound Notes"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 3: Undo(Like) inbound handler
|
||||
|
||||
When a remote user unlikes a local post, we should acknowledge it. Add a "Like" arm to `UndoActivity::receive` that calls `on_unlike` on the object handler. The `on_unlike` impl will be a no-op (we don't store remote likes in the likes table, only notifications — removing them requires more infrastructure). This prevents the "ignoring Undo of unknown activity type" log spam.
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/activitypub-base/src/activities.rs`
|
||||
- Modify: `crates/adapters/activitypub-base/src/content.rs`
|
||||
- Modify: `crates/adapters/activitypub/src/handler.rs`
|
||||
|
||||
- [ ] **Step 1: Add `on_unlike` to `ApObjectHandler` trait in `content.rs`**
|
||||
|
||||
Read `crates/adapters/activitypub-base/src/content.rs`. Find `ApObjectHandler`. Add after `on_announce_received`:
|
||||
|
||||
```rust
|
||||
/// Called when a remote actor removes a Like from a local thought.
|
||||
async fn on_unlike(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add no-op `on_unlike` to `ThoughtsObjectHandler` in `handler.rs`**
|
||||
|
||||
Read `crates/adapters/activitypub/src/handler.rs`. Add after `on_announce_received`:
|
||||
|
||||
```rust
|
||||
async fn on_unlike(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Add "Like" arm to `UndoActivity::receive` in `activities.rs`**
|
||||
|
||||
Read `crates/adapters/activitypub-base/src/activities.rs`. Find `UndoActivity::receive`. Find the `match obj_type` block. Add before the `other =>` catch-all:
|
||||
|
||||
```rust
|
||||
"Like" => {
|
||||
if let Some(obj_url_str) = self.object.get("object").and_then(|o| o.as_str())
|
||||
&& let Ok(obj_url) = Url::parse(obj_url_str)
|
||||
&& obj_url.host_str().unwrap_or("") == data.domain
|
||||
{
|
||||
data.object_handler
|
||||
.on_unlike(&obj_url, self.actor.inner())
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
tracing::warn!(error = %e, "failed to process unlike");
|
||||
});
|
||||
}
|
||||
tracing::info!(actor = %self.actor.inner(), "received Undo(Like)");
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Verify compilation**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error"
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/activitypub-base/src/activities.rs \
|
||||
crates/adapters/activitypub-base/src/content.rs \
|
||||
crates/adapters/activitypub/src/handler.rs
|
||||
git commit -m "feat(ap): handle Undo(Like) inbound activity"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 4: Update(Actor) outbound on profile change
|
||||
|
||||
When a user updates their profile (display name, bio, avatar), broadcast an `Update(Actor)` activity to their AP followers. The `broadcast_actor_update` method already exists on `ActivityPubService` — it just needs to be exposed as a port method and wired through the event system.
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/domain/src/ports.rs` — add to OutboundFederationPort
|
||||
- Modify: `crates/domain/src/events.rs` — add ProfileUpdated variant
|
||||
- Modify: `crates/domain/src/testing.rs` — SpyPort stub
|
||||
- Modify: `crates/application/src/use_cases/profile.rs` — publish event
|
||||
- Modify: `crates/application/src/services/federation_event.rs` — handle event
|
||||
- Modify: `crates/adapters/activitypub-base/src/service.rs` — implement port method
|
||||
|
||||
- [ ] **Step 1: Add `ProfileUpdated` to `DomainEvent` in `crates/domain/src/events.rs`**
|
||||
|
||||
Read the file. Add to the enum:
|
||||
|
||||
```rust
|
||||
ProfileUpdated { user_id: UserId },
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add `broadcast_actor_update` to `OutboundFederationPort` in `crates/domain/src/ports.rs`**
|
||||
|
||||
Find `OutboundFederationPort`. Add after `broadcast_undo_like`:
|
||||
|
||||
```rust
|
||||
/// Broadcast Update(Actor) to all accepted followers when a user updates their profile.
|
||||
async fn broadcast_actor_update(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
) -> Result<(), DomainError>;
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Add stub to `SpyPort` in `crates/application/src/services/federation_event.rs`**
|
||||
|
||||
Find `SpyPort` struct. Add field:
|
||||
```rust
|
||||
actor_updated: Mutex<Vec<UserId>>,
|
||||
```
|
||||
|
||||
Find `impl OutboundFederationPort for SpyPort`. Add:
|
||||
```rust
|
||||
async fn broadcast_actor_update(&self, user_id: &UserId) -> Result<(), DomainError> {
|
||||
self.actor_updated.lock().unwrap().push(user_id.clone());
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Add `EventPublisher` to `update_profile` use case in `crates/application/src/use_cases/profile.rs`**
|
||||
|
||||
Read the file. Find `pub async fn update_profile(...)`. Add `events: &dyn EventPublisher` as a parameter and import it. Publish `ProfileUpdated` after the update:
|
||||
|
||||
```rust
|
||||
pub async fn update_profile(
|
||||
users: &dyn UserRepository,
|
||||
events: &dyn EventPublisher,
|
||||
user_id: &UserId,
|
||||
display_name: Option<String>,
|
||||
bio: Option<String>,
|
||||
avatar_url: Option<String>,
|
||||
header_url: Option<String>,
|
||||
custom_css: Option<String>,
|
||||
) -> Result<(), DomainError> {
|
||||
users
|
||||
.update_profile(user_id, display_name, bio, avatar_url, header_url, custom_css)
|
||||
.await?;
|
||||
events
|
||||
.publish(&DomainEvent::ProfileUpdated {
|
||||
user_id: user_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
Make sure `DomainEvent` and `EventPublisher` are imported at the top of profile.rs. Check the existing imports and add what's missing.
|
||||
|
||||
- [ ] **Step 5: Update all callers of `update_profile` to pass `&*s.events`**
|
||||
|
||||
`update_profile` is called from `crates/presentation/src/handlers/users.rs`. Read that file. Find the `patch_profile` handler call to `update_profile`. Add `&*s.events` as the second argument:
|
||||
|
||||
```rust
|
||||
update_profile(
|
||||
&*s.users,
|
||||
&*s.events,
|
||||
&uid,
|
||||
body.display_name,
|
||||
body.bio,
|
||||
body.avatar_url,
|
||||
body.header_url,
|
||||
body.custom_css,
|
||||
)
|
||||
.await?;
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Implement `broadcast_actor_update` port method in `ActivityPubService` in `crates/adapters/activitypub-base/src/service.rs`**
|
||||
|
||||
Find `impl domain::ports::OutboundFederationPort for ActivityPubService`. Add after `broadcast_undo_like`:
|
||||
|
||||
```rust
|
||||
async fn broadcast_actor_update(
|
||||
&self,
|
||||
user_id: &domain::value_objects::UserId,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
self.broadcast_actor_update(user_id.as_uuid())
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||
}
|
||||
```
|
||||
|
||||
Note: this calls the existing private `broadcast_actor_update(uuid)` method on `ActivityPubService`.
|
||||
|
||||
- [ ] **Step 7: Handle `ProfileUpdated` in `federation_event.rs`**
|
||||
|
||||
Find the `match event` block. Add before the catch-all `_ => Ok(())`:
|
||||
|
||||
```rust
|
||||
DomainEvent::ProfileUpdated { user_id } => {
|
||||
self.ap.broadcast_actor_update(user_id).await
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 8: Verify build and tests**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -5
|
||||
cargo test -p domain -p application 2>&1 | tail -5
|
||||
```
|
||||
|
||||
- [ ] **Step 9: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/domain/src/events.rs \
|
||||
crates/domain/src/ports.rs \
|
||||
crates/domain/src/testing.rs \
|
||||
crates/application/src/use_cases/profile.rs \
|
||||
crates/application/src/services/federation_event.rs \
|
||||
crates/presentation/src/handlers/users.rs \
|
||||
crates/adapters/activitypub-base/src/service.rs
|
||||
git commit -m "feat(ap): broadcast Update(Actor) when user updates their profile"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 5: @mention notification
|
||||
|
||||
When a remote Note arrives with a Mention tag pointing to a local user, create a notification. The Note's `tag` array contains objects like `{"type":"Mention","href":"https://our.instance/users/{uuid}","name":"@user@domain"}`.
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/activitypub/src/note.rs` — add tag field
|
||||
- Modify: `crates/adapters/activitypub-base/src/content.rs` — add on_mention to trait
|
||||
- Modify: `crates/adapters/activitypub/src/handler.rs` — parse tags, implement on_mention
|
||||
- Modify: `crates/domain/src/events.rs` — add MentionReceived
|
||||
- Modify: `crates/domain/src/testing.rs` — no-op on_mention
|
||||
- Modify: `crates/application/src/services/notification_event.rs` — handle MentionReceived
|
||||
|
||||
- [ ] **Step 1: Add `tag` field to `ThoughtNote` in `crates/adapters/activitypub/src/note.rs`**
|
||||
|
||||
Read the file. Add to the `ThoughtNote` struct:
|
||||
|
||||
```rust
|
||||
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
||||
pub tag: Vec<serde_json::Value>,
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add `MentionReceived` to `DomainEvent` in `crates/domain/src/events.rs`**
|
||||
|
||||
Add to the enum:
|
||||
|
||||
```rust
|
||||
MentionReceived {
|
||||
thought_id: ThoughtId,
|
||||
mentioned_user_id: UserId,
|
||||
author_user_id: UserId,
|
||||
},
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Add `on_mention` to `ApObjectHandler` in `crates/adapters/activitypub-base/src/content.rs`**
|
||||
|
||||
Add after `on_unlike`:
|
||||
|
||||
```rust
|
||||
/// Called once per @mention of a local user in a remote Note.
|
||||
/// `thought_ap_id` is the AP URL of the Note, `mentioned_user_id` is the UUID
|
||||
/// of the local user being mentioned, `actor_url` is the remote author's AP URL.
|
||||
async fn on_mention(
|
||||
&self,
|
||||
thought_ap_id: &Url,
|
||||
mentioned_user_uuid: uuid::Uuid,
|
||||
actor_url: &Url,
|
||||
) -> anyhow::Result<()>;
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Add no-op `on_mention` to `TestStore`'s `ApObjectHandler` impl in `crates/domain/src/testing.rs`**
|
||||
|
||||
Note: `TestStore` does NOT implement `ApObjectHandler` — that's `ThoughtsObjectHandler` in the activitypub adapter. Instead, find if there is a test double or just implement in handler.rs directly (step 5 below covers it).
|
||||
|
||||
- [ ] **Step 5: Implement `on_mention` in `ThoughtsObjectHandler` in `crates/adapters/activitypub/src/handler.rs`**
|
||||
|
||||
Add after `on_unlike`:
|
||||
|
||||
```rust
|
||||
async fn on_mention(
|
||||
&self,
|
||||
thought_ap_id: &url::Url,
|
||||
mentioned_user_uuid: uuid::Uuid,
|
||||
actor_url: &url::Url,
|
||||
) -> anyhow::Result<()> {
|
||||
// Resolve remote author to a local user ID.
|
||||
let author_user_id = match self
|
||||
.repo
|
||||
.find_remote_actor_id(actor_url)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?
|
||||
{
|
||||
Some(id) => id,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
// Extract thought UUID from /thoughts/{uuid} path.
|
||||
let thought_uuid = thought_ap_id
|
||||
.path()
|
||||
.strip_prefix("/thoughts/")
|
||||
.and_then(|s| s.split('/').next())
|
||||
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||||
|
||||
let thought_uuid = match thought_uuid {
|
||||
Some(u) => u,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
if let Some(ep) = &self.event_publisher {
|
||||
ep.publish(&domain::events::DomainEvent::MentionReceived {
|
||||
thought_id: domain::value_objects::ThoughtId::from_uuid(thought_uuid),
|
||||
mentioned_user_id: domain::value_objects::UserId::from_uuid(mentioned_user_uuid),
|
||||
author_user_id,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Parse Mention tags and call `on_mention` in `ThoughtsObjectHandler::on_create`**
|
||||
|
||||
Find `on_create`. After the `accept_note(...)` call, add:
|
||||
|
||||
```rust
|
||||
// Fire mention notifications for any local @mentions in the note's tag array.
|
||||
let local_domain = self.urls.base_url().host_str().unwrap_or("");
|
||||
for tag in ¬e.tag {
|
||||
if tag.get("type").and_then(|t| t.as_str()) != Some("Mention") {
|
||||
continue;
|
||||
}
|
||||
let href = match tag.get("href").and_then(|h| h.as_str()) {
|
||||
Some(h) => h,
|
||||
None => continue,
|
||||
};
|
||||
let href_url = match url::Url::parse(href) {
|
||||
Ok(u) => u,
|
||||
Err(_) => continue,
|
||||
};
|
||||
// Only process mentions of local users (UUID-based /users/{uuid} paths).
|
||||
if href_url.host_str().unwrap_or("") != local_domain {
|
||||
continue;
|
||||
}
|
||||
let user_uuid = href_url
|
||||
.path()
|
||||
.strip_prefix("/users/")
|
||||
.and_then(|s| s.split('/').next())
|
||||
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||||
if let Some(uuid) = user_uuid {
|
||||
self.on_mention(ap_id, uuid, actor_url)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
tracing::warn!(error = %e, "failed to process mention notification");
|
||||
});
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Note: `self.urls.base_url()` — check `ThoughtsUrls` for how to get the base URL `Url`. If not available, parse `self.urls` fields or add a helper. Check the `ThoughtsUrls` struct in `crates/adapters/activitypub/src/urls.rs`.
|
||||
|
||||
- [ ] **Step 7: Handle `MentionReceived` in `crates/application/src/services/notification_event.rs`**
|
||||
|
||||
Find the `match event` block. Add before the `_ => Ok(())` catch-all:
|
||||
|
||||
```rust
|
||||
DomainEvent::MentionReceived {
|
||||
thought_id,
|
||||
mentioned_user_id,
|
||||
author_user_id,
|
||||
} => {
|
||||
self.notifications
|
||||
.save(&Notification {
|
||||
id: NotificationId::new(),
|
||||
user_id: mentioned_user_id.clone(),
|
||||
notification_type: NotificationType::Mention,
|
||||
from_user_id: Some(author_user_id.clone()),
|
||||
thought_id: Some(thought_id.clone()),
|
||||
read: false,
|
||||
created_at: Utc::now(),
|
||||
})
|
||||
.await
|
||||
}
|
||||
```
|
||||
|
||||
Make sure `NotificationType::Mention` is a variant — check `crates/domain/src/models/notification.rs`. It already has `Mention` variant.
|
||||
|
||||
- [ ] **Step 8: Verify build and tests**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -10
|
||||
cargo test -p domain -p application 2>&1 | tail -5
|
||||
```
|
||||
|
||||
- [ ] **Step 9: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/activitypub/src/note.rs \
|
||||
crates/adapters/activitypub-base/src/content.rs \
|
||||
crates/adapters/activitypub/src/handler.rs \
|
||||
crates/domain/src/events.rs \
|
||||
crates/application/src/services/notification_event.rs
|
||||
git commit -m "feat(ap): @mention notification from inbound remote Notes"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 6: Remote posts in home feed
|
||||
|
||||
The `home_feed` SQL currently only includes thoughts from users in the `follows` table (local follows). Remote follows are in `federation_following`, so remote users' posts never appear. Extend the SQL to also include thoughts from users whose AP URL is in `federation_following` for the viewer.
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/postgres/src/feed.rs`
|
||||
|
||||
- [ ] **Step 1: Read `crates/adapters/postgres/src/feed.rs` in full**
|
||||
|
||||
Focus on `fn feed_select(viewer: Option<uuid::Uuid>) -> String` and `async fn home_feed(...)`.
|
||||
|
||||
Key insight: `feed_select` embeds `viewer` UUID directly into the SQL string (not as a bind parameter). The home_feed SQL uses `$1` (following_ids), `$2` (limit), `$3` (offset) as bind params.
|
||||
|
||||
- [ ] **Step 2: Add `follower` parameter to `feed_select`**
|
||||
|
||||
Change the signature to:
|
||||
```rust
|
||||
fn feed_select(viewer: Option<uuid::Uuid>, follower: Option<uuid::Uuid>) -> String
|
||||
```
|
||||
|
||||
At the top of the function body, generate a federation following subquery:
|
||||
```rust
|
||||
let federation_clause = match follower {
|
||||
Some(fid) => format!(
|
||||
"OR t.user_id IN (
|
||||
SELECT u2.id FROM users u2
|
||||
JOIN federation_following ff ON u2.ap_id = ff.remote_actor_url
|
||||
WHERE ff.local_user_id = '{fid}'
|
||||
)"
|
||||
),
|
||||
None => String::new(),
|
||||
};
|
||||
```
|
||||
|
||||
This string is used in step 3's WHERE clause modification.
|
||||
|
||||
Since `feed_select` generates only the SELECT part (not WHERE), the `federation_clause` needs to be returned somehow. Options:
|
||||
- Return a tuple `(select_str, federation_clause)` from `feed_select`
|
||||
- Or add a separate helper `fn federation_following_clause(follower: Option<uuid::Uuid>) -> String`
|
||||
|
||||
Use option B — separate helper — to avoid changing `feed_select`'s return type:
|
||||
|
||||
```rust
|
||||
fn federation_following_clause(follower: Option<uuid::Uuid>) -> String {
|
||||
match follower {
|
||||
Some(fid) => format!(
|
||||
" OR t.user_id IN (
|
||||
SELECT u2.id FROM users u2
|
||||
JOIN federation_following ff ON u2.ap_id = ff.remote_actor_url
|
||||
WHERE ff.local_user_id = '{fid}'
|
||||
)"
|
||||
),
|
||||
None => String::new(),
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Leave `feed_select` signature unchanged.
|
||||
|
||||
- [ ] **Step 3: Modify `home_feed` to use `federation_following_clause`**
|
||||
|
||||
Find the `home_feed` method. The viewer_id is the feed owner (the logged-in user), which is also the person whose federation_following we want.
|
||||
|
||||
Replace:
|
||||
```rust
|
||||
let viewer = viewer_id.map(|v| v.as_uuid());
|
||||
// ...
|
||||
let total: i64 = sqlx::query_scalar(
|
||||
"SELECT COUNT(*) FROM thoughts t WHERE t.user_id=ANY($1) AND t.visibility != 'direct'",
|
||||
)
|
||||
```
|
||||
|
||||
With:
|
||||
```rust
|
||||
let viewer = viewer_id.map(|v| v.as_uuid());
|
||||
let fed_clause = federation_following_clause(viewer);
|
||||
let total: i64 = sqlx::query_scalar(&format!(
|
||||
"SELECT COUNT(*) FROM thoughts t WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct'",
|
||||
fed_clause
|
||||
))
|
||||
```
|
||||
|
||||
And replace:
|
||||
```rust
|
||||
let sql = format!("{sel} WHERE t.user_id=ANY($1) AND t.visibility != 'direct' ORDER BY t.created_at DESC LIMIT $2 OFFSET $3");
|
||||
```
|
||||
|
||||
With:
|
||||
```rust
|
||||
let sql = format!("{sel} WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct' ORDER BY t.created_at DESC LIMIT $2 OFFSET $3", fed_clause);
|
||||
```
|
||||
|
||||
The rest of the bindings (`$1`, `$2`, `$3`) stay unchanged.
|
||||
|
||||
- [ ] **Step 4: Verify compilation**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build -p postgres 2>&1 | grep "^error" | head -5
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Run all tests**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo test -p domain -p application 2>&1 | tail -5
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/postgres/src/feed.rs
|
||||
git commit -m "feat(feed): include remote following posts in home feed"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 7: Reply parent display + API field
|
||||
|
||||
Remote posts that are replies show without context because:
|
||||
1. `ThoughtResponse` doesn't expose `in_reply_to_url` (the external URL of the parent)
|
||||
2. The frontend doesn't link to the parent when it's external
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/api-types/src/responses.rs`
|
||||
- Modify: `crates/presentation/src/handlers/feed.rs` (or wherever `to_thought_response` is defined)
|
||||
- Modify: `thoughts-frontend/lib/api.ts`
|
||||
- Modify: `thoughts-frontend/components/thought-card.tsx`
|
||||
|
||||
- [ ] **Step 1: Add `in_reply_to_url` to `ThoughtResponse` in `crates/api-types/src/responses.rs`**
|
||||
|
||||
Read the file. Find `ThoughtResponse` struct. Add after `reply_to_id`:
|
||||
|
||||
```rust
|
||||
#[serde(rename = "replyToUrl", skip_serializing_if = "Option::is_none")]
|
||||
pub in_reply_to_url: Option<String>,
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Map `in_reply_to_url` in the response builder**
|
||||
|
||||
Find where `ThoughtResponse` is constructed from a `Thought` (search for `ThoughtResponse {` or `to_thought_response`). Add the mapping:
|
||||
|
||||
```rust
|
||||
in_reply_to_url: thought.in_reply_to_url.clone(),
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Verify backend compilation**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -5
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Add `replyToUrl` to `ThoughtSchema` in `thoughts-frontend/lib/api.ts`**
|
||||
|
||||
Find `ThoughtSchema`. Add:
|
||||
|
||||
```typescript
|
||||
replyToUrl: z.string().url().nullable().optional(),
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Update `thought-card.tsx` to show external reply link**
|
||||
|
||||
Read `thoughts-frontend/components/thought-card.tsx`. Find the section that renders `thought.replyToId`. It currently shows "Replying to parent thought" with a hash link only when `isReply` is true.
|
||||
|
||||
Add an external reply link for when the thought has a `replyToUrl` but no local `replyToId`:
|
||||
|
||||
```tsx
|
||||
{thought.replyToId && isReply && (
|
||||
<div className="text-sm text-muted-foreground flex items-center gap-2">
|
||||
<CornerUpLeft className="h-4 w-4 text-primary/70" />
|
||||
<span>
|
||||
Replying to{" "}
|
||||
<Link
|
||||
href={`#${thought.replyToId}`}
|
||||
className="hover:underline text-primary text-shadow-sm"
|
||||
>
|
||||
parent thought
|
||||
</Link>
|
||||
</span>
|
||||
</div>
|
||||
)}
|
||||
{!thought.replyToId && thought.replyToUrl && (
|
||||
<div className="text-sm text-muted-foreground flex items-center gap-2">
|
||||
<CornerUpLeft className="h-4 w-4 text-primary/70" />
|
||||
<span>
|
||||
Replying to{" "}
|
||||
<a
|
||||
href={thought.replyToUrl}
|
||||
target="_blank"
|
||||
rel="noopener noreferrer"
|
||||
className="hover:underline text-primary text-shadow-sm"
|
||||
>
|
||||
original post ↗
|
||||
</a>
|
||||
</span>
|
||||
</div>
|
||||
)}
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Type check frontend**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts/thoughts-frontend && npx tsc --noEmit 2>&1 | grep "error TS" | head -5
|
||||
```
|
||||
|
||||
- [ ] **Step 7: Final build + tests**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error"
|
||||
cargo test -p domain -p application 2>&1 | tail -5
|
||||
```
|
||||
|
||||
- [ ] **Step 8: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/api-types/src/responses.rs \
|
||||
thoughts-frontend/lib/api.ts \
|
||||
thoughts-frontend/components/thought-card.tsx
|
||||
git commit -m "feat: expose replyToUrl in API + show external parent link on remote reply posts"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Notes
|
||||
|
||||
- **Task 5 (mentions)**: `self.urls.base_url()` — if `ThoughtsUrls` doesn't expose the base URL as a `Url`, parse it from `self.urls.base_url` string. Check `crates/adapters/activitypub/src/urls.rs` for the exact field.
|
||||
- **Task 6 (feed)**: The embedded UUID in the SQL is a UUID type (hex + hyphens only), safe to format-string without SQL injection risk.
|
||||
- **Task 7 (reply)**: The `to_thought_response` builder might be in `handlers/feed.rs`, `handlers/thoughts.rs`, or a shared module — search the codebase for where `ThoughtResponse` is constructed.
|
||||
- **Profile update (Task 4)**: If tests in `application` call `update_profile` directly, they'll need to pass a `TestStore` as the `events` parameter (TestStore implements EventPublisher).
|
||||
@@ -1,7 +1,7 @@
|
||||
import type { Metadata } from "next";
|
||||
import { notFound } from "next/navigation";
|
||||
import { cookies } from "next/headers";
|
||||
import { getMe, lookupRemoteActor, getRemoteActorPosts, Me } from "@/lib/api";
|
||||
import { getMe, getRemoteFollowing, lookupRemoteActor, getRemoteActorPosts, Me } from "@/lib/api";
|
||||
import { RemoteUserProfile } from "@/components/remote-user-profile";
|
||||
|
||||
interface RemoteActorPageProps {
|
||||
@@ -53,10 +53,11 @@ export default async function RemoteActorPage({
|
||||
|
||||
const token = (await cookies()).get("auth_token")?.value ?? null;
|
||||
|
||||
const [actorResult, postsResult, meResult] = await Promise.allSettled([
|
||||
const [actorResult, postsResult, meResult, followingResult] = await Promise.allSettled([
|
||||
lookupRemoteActor(handle, token),
|
||||
getRemoteActorPosts(handle, 1, token),
|
||||
token ? getMe(token) : Promise.resolve(null),
|
||||
token ? getRemoteFollowing(token) : Promise.resolve([]),
|
||||
]);
|
||||
|
||||
if (actorResult.status === "rejected") {
|
||||
@@ -68,6 +69,17 @@ export default async function RemoteActorPage({
|
||||
postsResult.status === "fulfilled" ? postsResult.value.items : [];
|
||||
const me =
|
||||
meResult.status === "fulfilled" ? (meResult.value as Me | null) : null;
|
||||
const following =
|
||||
followingResult.status === "fulfilled" ? followingResult.value : [];
|
||||
const initialFollowed = following.some((f) => f.url === actor.url);
|
||||
|
||||
return <RemoteUserProfile actor={actor} initialPosts={posts} me={me} />;
|
||||
return (
|
||||
<RemoteUserProfile
|
||||
key={actor.url}
|
||||
actor={actor}
|
||||
initialPosts={posts}
|
||||
me={me}
|
||||
initialFollowed={initialFollowed}
|
||||
/>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -265,7 +265,7 @@ export default async function ProfilePage({ params }: ProfilePageProps) {
|
||||
id="profile-card__thoughts"
|
||||
className="col-span-1 lg:col-span-3 space-y-4"
|
||||
>
|
||||
<Tabs defaultValue="thoughts">
|
||||
<Tabs key={user.id.toString()} defaultValue="thoughts">
|
||||
<TabsList className="mb-4">
|
||||
<TabsTrigger value="thoughts">Thoughts</TabsTrigger>
|
||||
{isOwnProfile && (
|
||||
|
||||
@@ -59,7 +59,7 @@ export function PendingRequests({ compact = false }: Props) {
|
||||
className="flex items-center justify-between gap-3"
|
||||
>
|
||||
<Link
|
||||
href={`/users/@${actor.handle}`}
|
||||
href={`/users/@${fullFediverseHandle(actor.handle, actor.url)}`}
|
||||
className="flex items-center gap-2 min-w-0 hover:opacity-80"
|
||||
>
|
||||
<UserAvatar
|
||||
|
||||
@@ -39,7 +39,7 @@ export function RemoteFollowers() {
|
||||
{followers.map((actor) => (
|
||||
<li key={actor.url} className="flex items-center justify-between gap-3">
|
||||
<Link
|
||||
href={`/users/@${actor.handle}`}
|
||||
href={`/users/@${fullFediverseHandle(actor.handle, actor.url)}`}
|
||||
className="flex items-center gap-2 min-w-0 hover:opacity-80"
|
||||
>
|
||||
<UserAvatar
|
||||
|
||||
@@ -22,9 +22,10 @@ export function RemoteFollowing() {
|
||||
.finally(() => setLoading(false));
|
||||
}, [token]);
|
||||
|
||||
const unfollow = async (handle: string) => {
|
||||
const unfollow = async (actor: RemoteActor) => {
|
||||
if (!token) return;
|
||||
setFollowing((prev) => prev.filter((f) => f.handle !== handle));
|
||||
const handle = fullFediverseHandle(actor.handle, actor.url);
|
||||
setFollowing((prev) => prev.filter((f) => f.url !== actor.url));
|
||||
await unfollowRemoteActor(handle, token).catch(() => {
|
||||
toast.error("Failed to unfollow");
|
||||
});
|
||||
@@ -39,7 +40,7 @@ export function RemoteFollowing() {
|
||||
{following.map((actor) => (
|
||||
<li key={actor.url} className="flex items-center justify-between gap-3">
|
||||
<Link
|
||||
href={`/users/@${actor.handle}`}
|
||||
href={`/users/@${fullFediverseHandle(actor.handle, actor.url)}`}
|
||||
className="flex items-center gap-2 min-w-0 hover:opacity-80"
|
||||
>
|
||||
<UserAvatar
|
||||
@@ -59,7 +60,7 @@ export function RemoteFollowing() {
|
||||
<Button
|
||||
size="sm"
|
||||
variant="outline"
|
||||
onClick={() => unfollow(actor.handle)}
|
||||
onClick={() => unfollow(actor)}
|
||||
>
|
||||
Unfollow
|
||||
</Button>
|
||||
|
||||
@@ -17,14 +17,16 @@ interface RemoteUserProfileProps {
|
||||
actor: RemoteActor;
|
||||
initialPosts: Thought[];
|
||||
me: Me | null;
|
||||
initialFollowed?: boolean;
|
||||
}
|
||||
|
||||
export function RemoteUserProfile({
|
||||
actor,
|
||||
initialPosts,
|
||||
me,
|
||||
initialFollowed = false,
|
||||
}: RemoteUserProfileProps) {
|
||||
const [followed, setFollowed] = useState(false);
|
||||
const [followed, setFollowed] = useState(initialFollowed);
|
||||
const [loading, setLoading] = useState(false);
|
||||
const { token } = useAuth();
|
||||
|
||||
|
||||
@@ -106,6 +106,22 @@ export function ThoughtCard({
|
||||
</span>
|
||||
</div>
|
||||
)}
|
||||
{!thought.replyToId && thought.replyToUrl && (
|
||||
<div className="text-sm text-muted-foreground flex items-center gap-2">
|
||||
<CornerUpLeft className="h-4 w-4 text-primary/70" />
|
||||
<span>
|
||||
Replying to{" "}
|
||||
<a
|
||||
href={thought.replyToUrl}
|
||||
target="_blank"
|
||||
rel="noopener noreferrer"
|
||||
className="hover:underline text-primary text-shadow-sm"
|
||||
>
|
||||
original post ↗
|
||||
</a>
|
||||
</span>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
<Card className="mt-2">
|
||||
<CardHeader className="flex flex-row items-center justify-between space-y-0">
|
||||
|
||||
@@ -17,7 +17,8 @@ export function ThoughtThread({
|
||||
const author = {
|
||||
username: thought.author.username,
|
||||
displayName: thought.author.displayName,
|
||||
...authorDetails.get(thought.author.username),
|
||||
avatarUrl: thought.author.avatarUrl, // API-provided avatar (from DB COALESCE)
|
||||
...authorDetails.get(thought.author.username), // override for local users
|
||||
};
|
||||
|
||||
return (
|
||||
|
||||
@@ -41,6 +41,7 @@ export const ThoughtSchema = z.object({
|
||||
content: z.string(),
|
||||
author: UserSchema,
|
||||
replyToId: z.string().uuid().nullable(),
|
||||
replyToUrl: z.string().url().nullable().optional(),
|
||||
visibility: z.string(),
|
||||
contentWarning: z.string().nullable(),
|
||||
sensitive: z.boolean(),
|
||||
|
||||
Reference in New Issue
Block a user