diff --git a/crates/adapters/activitypub-base/src/activities.rs b/crates/adapters/activitypub-base/src/activities.rs index 203a3a9..2c96fde 100644 --- a/crates/adapters/activitypub-base/src/activities.rs +++ b/crates/adapters/activitypub-base/src/activities.rs @@ -134,6 +134,11 @@ impl Activity for AcceptActivity { } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + if self.actor.inner() != self.object.object.inner() { + return Err(Error::bad_request(anyhow::anyhow!( + "Accept actor does not match Follow target" + ))); + } Ok(()) } @@ -184,6 +189,11 @@ impl Activity for RejectActivity { } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + if self.actor.inner() != self.object.object.inner() { + return Err(Error::bad_request(anyhow::anyhow!( + "Reject actor does not match Follow target" + ))); + } Ok(()) } @@ -316,6 +326,10 @@ pub struct CreateActivity { pub(crate) to: Vec, #[serde(skip_serializing_if = "Vec::is_empty", default)] pub(crate) cc: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) bto: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) bcc: Vec, } #[async_trait::async_trait] @@ -332,6 +346,14 @@ impl Activity for CreateActivity { } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) + && let Ok(attributed_url) = Url::parse(attributed_to) + && &attributed_url != self.actor.inner() + { + return Err(Error::bad_request(anyhow::anyhow!( + "Create actor does not match object attributedTo" + ))); + } Ok(()) } @@ -389,6 +411,25 @@ impl Activity for DeleteActivity { } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + let actor_domain = self.actor.inner().host_str().unwrap_or(""); + let object_domain = match &self.object { + serde_json::Value::String(s) => Url::parse(s) + .ok() + .and_then(|u| u.host_str().map(|h| h.to_string())) + .unwrap_or_default(), + serde_json::Value::Object(o) => o + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| Url::parse(s).ok()) + .and_then(|u| u.host_str().map(|h| h.to_string())) + .unwrap_or_default(), + _ => String::new(), + }; + if !object_domain.is_empty() && actor_domain != object_domain { + return Err(Error::bad_request(anyhow::anyhow!( + "Delete actor domain does not match object domain" + ))); + } Ok(()) } @@ -465,6 +506,14 @@ impl Activity for UpdateActivity { } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) + && let Ok(attributed_url) = Url::parse(attributed_to) + && &attributed_url != self.actor.inner() + { + return Err(Error::bad_request(anyhow::anyhow!( + "Update actor does not match object attributedTo" + ))); + } Ok(()) } @@ -474,7 +523,12 @@ impl Activity for UpdateActivity { tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); return Ok(()); } - let ap_id = self.id.clone(); + let ap_id = self + .object + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| Url::parse(s).ok()) + .unwrap_or_else(|| self.id.clone()); let actor_url = self.actor.inner().clone(); data.object_handler .on_update(&ap_id, &actor_url, self.object) @@ -527,6 +581,11 @@ impl Activity for AnnounceActivity { } let object_domain = self.object.host_str().unwrap_or(""); if object_domain != data.domain { + tracing::debug!( + actor = %self.actor.inner(), + object = %self.object, + "received Announce of non-local object — skipped (cross-server boost not supported)" + ); return Ok(()); } data.federation_repo @@ -692,14 +751,69 @@ impl Activity for BlockActivity { tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); return Ok(()); } - // They blocked us — remove them from our following list if let Some(local_user_id) = crate::urls::extract_user_id_from_url(&self.object) { let _ = data .federation_repo .remove_following(local_user_id, self.actor.inner().as_str()) .await; + let _ = data + .federation_repo + .remove_follower(local_user_id, self.actor.inner().as_str()) + .await; } - tracing::info!(actor = %self.actor.inner(), "received block"); + tracing::info!(actor = %self.actor.inner(), "received block — removed following and follower"); + Ok(()) + } +} + +// --- Move (account migration) --- + +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[serde(rename = "Move")] +pub struct MoveType; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct MoveActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: MoveType, + pub(crate) actor: ObjectId, + pub(crate) object: Url, + pub(crate) target: Url, +} + +#[async_trait::async_trait] +impl Activity for MoveActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + if &self.object != self.actor.inner() { + return Err(Error::bad_request(anyhow::anyhow!( + "Move object must be the actor itself" + ))); + } + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let domain = self.actor().host_str().unwrap_or(""); + if data.federation_repo.is_domain_blocked(domain).await? { + return Ok(()); + } + tracing::info!( + actor = %self.actor.inner(), + target = %self.target, + "received Move (account migration) — target noted" + ); Ok(()) } } @@ -732,4 +846,6 @@ pub enum InboxActivities { Block(BlockActivity), #[serde(rename = "Like")] Like(LikeActivity), + #[serde(rename = "Move")] + Move(MoveActivity), } diff --git a/crates/adapters/activitypub-base/src/federation.rs b/crates/adapters/activitypub-base/src/federation.rs index 5ccd975..23d59e2 100644 --- a/crates/adapters/activitypub-base/src/federation.rs +++ b/crates/adapters/activitypub-base/src/federation.rs @@ -33,7 +33,6 @@ impl ApFederationConfig { .domain(&data.domain) .app_data(data) .debug(false) - .http_signature_compat(true) .build() .await? }; diff --git a/crates/adapters/activitypub-base/src/outbox.rs b/crates/adapters/activitypub-base/src/outbox.rs index 327477b..80f004e 100644 --- a/crates/adapters/activitypub-base/src/outbox.rs +++ b/crates/adapters/activitypub-base/src/outbox.rs @@ -87,6 +87,8 @@ pub async fn outbox_handler( object, to: vec![crate::urls::AS_PUBLIC.to_string()], cc: vec![followers_url.clone()], + bto: vec![], + bcc: vec![], })) .expect("serializable") }) diff --git a/crates/adapters/activitypub-base/src/service.rs b/crates/adapters/activitypub-base/src/service.rs index 331dff6..52ec70a 100644 --- a/crates/adapters/activitypub-base/src/service.rs +++ b/crates/adapters/activitypub-base/src/service.rs @@ -1373,6 +1373,8 @@ impl ActivityPubService { object: object_json.clone(), to: vec![], cc: vec![], + bto: vec![], + bcc: vec![], }; let sends = SendActivityTask::prepare( @@ -1436,6 +1438,8 @@ impl domain::ports::OutboundFederationPort for ActivityPubService { object: note, to: vec![crate::urls::AS_PUBLIC.to_string()], cc: vec![local_actor.followers_url.to_string()], + bto: vec![], + bcc: vec![], }; let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( &activitypub_federation::protocol::context::WithContext::new_default(create), diff --git a/crates/adapters/activitypub/src/handler.rs b/crates/adapters/activitypub/src/handler.rs index 70d0bbb..fdda294 100644 --- a/crates/adapters/activitypub/src/handler.rs +++ b/crates/adapters/activitypub/src/handler.rs @@ -261,7 +261,44 @@ impl ApObjectHandler for ThoughtsObjectHandler { Ok(()) } - async fn on_unlike(&self, _object_url: &url::Url, _actor_url: &url::Url) -> anyhow::Result<()> { + async fn on_unlike(&self, object_url: &url::Url, actor_url: &url::Url) -> anyhow::Result<()> { + let thought_uuid = object_url + .path() + .strip_prefix(THOUGHTS_PATH_PREFIX) + .and_then(|s| s.split('/').next()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + + let thought_uuid = match thought_uuid { + Some(u) => u, + None => { + tracing::debug!(object = %object_url, "on_unlike: not a local thought URL, skipping"); + return Ok(()); + } + }; + + let actor_user_id = self + .repo + .find_remote_actor_id(actor_url) + .await + .map_err(|e| anyhow!("{e}"))?; + + let actor_user_id = match actor_user_id { + Some(id) => id, + None => { + tracing::debug!(actor = %actor_url, "on_unlike: remote actor not interned, skipping"); + return Ok(()); + } + }; + + if let Some(ep) = &self.event_publisher { + ep.publish(&domain::events::DomainEvent::LikeRemoved { + user_id: actor_user_id, + thought_id: domain::value_objects::ThoughtId::from_uuid(thought_uuid), + }) + .await + .map_err(|e| anyhow!("{e}"))?; + } + Ok(()) } diff --git a/crates/adapters/postgres-federation/src/lib.rs b/crates/adapters/postgres-federation/src/lib.rs index 7d37136..b1c3d5e 100644 --- a/crates/adapters/postgres-federation/src/lib.rs +++ b/crates/adapters/postgres-federation/src/lib.rs @@ -117,7 +117,7 @@ impl FederationRepository for PostgresFederationRepository { COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url FROM federation_followers f LEFT JOIN remote_actors r ON r.url=f.remote_actor_url - WHERE f.local_user_id=$1" + WHERE f.local_user_id=$1 AND f.status='accepted'" ).bind(local_user_id).fetch_all(&self.pool).await.map_err(|e| anyhow!(e)).map(|rows| rows.into_iter().map(|r| Follower { actor: map_remote_actor(r.remote_actor_url, r.handle, r.inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url), status: str_status(&r.status), @@ -276,7 +276,7 @@ impl FederationRepository for PostgresFederationRepository { COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url FROM federation_following f LEFT JOIN remote_actors r ON r.url=f.remote_actor_url - WHERE f.local_user_id=$1 + WHERE f.local_user_id=$1 AND f.status='accepted' ORDER BY f.created_at DESC LIMIT $2 OFFSET $3" ).bind(local_user_id).bind(limit as i64).bind(offset as i64).fetch_all(&self.pool).await.map_err(|e| anyhow!(e)).map(|rows| rows.into_iter().map(|r| map_remote_actor(r.remote_actor_url, r.handle, r.inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url) @@ -285,7 +285,7 @@ impl FederationRepository for PostgresFederationRepository { async fn count_following(&self, local_user_id: uuid::Uuid) -> Result { let n: i64 = - sqlx::query_scalar("SELECT COUNT(*) FROM federation_following WHERE local_user_id=$1") + sqlx::query_scalar("SELECT COUNT(*) FROM federation_following WHERE local_user_id=$1 AND status='accepted'") .bind(local_user_id) .fetch_one(&self.pool) .await