diff --git a/src/activities.rs b/src/activities.rs deleted file mode 100644 index df32ab2..0000000 --- a/src/activities.rs +++ /dev/null @@ -1,1083 +0,0 @@ -use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - kinds::activity::{ - AcceptType, CreateType, DeleteType, FollowType, RejectType, UndoType, UpdateType, - }, - protocol::verification::verify_domains_match, - traits::Activity, -}; -use serde::{Deserialize, Serialize}; -use url::Url; - -#[derive(Clone, Default, Debug, Serialize, Deserialize)] -#[serde(rename = "Announce")] -pub struct AnnounceType; - -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(rename = "Like")] -pub struct LikeType; - -impl Default for LikeType { - fn default() -> Self { - Self - } -} - -use crate::actors::DbActor; -use crate::data::FederationData; -use crate::error::Error; -use crate::repository::{FollowerStatus, FollowingStatus}; - -// --- Follow --- - -#[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct FollowActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: FollowType, - pub(crate) actor: ObjectId, - pub(crate) object: ObjectId, -} - -#[async_trait::async_trait] -impl Activity for FollowActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, data: &Data) -> Result<(), Self::Error> { - let target_url = self.object.inner(); - let target_domain = match (target_url.host_str(), target_url.port()) { - (Some(host), Some(port)) => format!("{}:{}", host, port), - (Some(host), None) => host.to_string(), - _ => { - return Err(Error::bad_request(anyhow::anyhow!( - "invalid follow target URL" - ))); - } - }; - if target_domain == data.domain { - return Ok(()); - } - // Domain mismatch — still accept if the UUID resolves to a local user. - // This handles domain migrations where remote servers have cached the old actor URL. - if let Some(uuid) = crate::urls::extract_user_id_from_url(target_url) { - if data.user_repo.find_by_id(uuid).await.ok().flatten().is_some() { - tracing::debug!(target = %target_url, local_domain = %data.domain, "accepting follow for migrated actor URL"); - return Ok(()); - } - } - Err(Error::bad_request(anyhow::anyhow!( - "follow target is not a local actor" - ))) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - if already_processed(&self.id, data).await { - return Ok(()); - } - let actor_url = self.actor.inner(); - let domain = actor_url.host_str().unwrap_or(""); - - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %actor_url, "ignoring follow from blocked domain"); - return Ok(()); - } - - // Check per-actor block BEFORE issuing any outbound HTTP request. - // We can derive the target user ID from the follow object URL without dereferencing. - if let Some(target_user_id) = crate::urls::extract_user_id_from_url(self.object.inner()) { - if data - .federation_repo - .is_actor_blocked(target_user_id, actor_url.as_str()) - .await? - { - tracing::info!(actor = %actor_url, "ignoring follow from blocked actor"); - return Ok(()); - } - } - - let _follower = self.actor.dereference(data).await?; - let local_actor = self.object.dereference(data).await?; - - data.federation_repo - .add_follower( - local_actor.user_id, - actor_url.as_str(), - FollowerStatus::Pending, - self.id.as_str(), - ) - .await?; - - tracing::info!( - follower = %actor_url, - local_user = %local_actor.user_id, - "follow request pending approval" - ); - Ok(()) - } -} - -// --- Accept --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct AcceptActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: AcceptType, - pub(crate) actor: ObjectId, - pub(crate) object: FollowActivity, -} - -#[async_trait::async_trait] -impl Activity for AcceptActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - if self.actor.inner() != self.object.object.inner() { - return Err(Error::bad_request(anyhow::anyhow!( - "Accept actor does not match Follow target" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - if already_processed(&self.id, data).await { - return Ok(()); - } - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - let local_user_id = crate::urls::extract_user_id_from_url(self.object.actor.inner()) - .ok_or_else(|| Error::bad_request(anyhow::anyhow!("invalid actor URL in Follow")))?; - data.federation_repo - .update_following_status( - local_user_id, - self.actor.inner().as_str(), - FollowingStatus::Accepted, - ) - .await?; - - tracing::info!(remote_actor = %self.actor.inner(), "follow accepted by remote"); - Ok(()) - } -} - -// --- Reject --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct RejectActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: RejectType, - pub(crate) actor: ObjectId, - pub(crate) object: FollowActivity, -} - -#[async_trait::async_trait] -impl Activity for RejectActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - if self.actor.inner() != self.object.object.inner() { - return Err(Error::bad_request(anyhow::anyhow!( - "Reject actor does not match Follow target" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - if already_processed(&self.id, data).await { - return Ok(()); - } - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - if let Some(user_id) = crate::urls::extract_user_id_from_url(self.object.actor.inner()) { - data.federation_repo - .remove_following(user_id, self.actor.inner().as_str()) - .await?; - } - tracing::info!(actor = %self.actor.inner(), "follow rejected"); - Ok(()) - } -} - -// --- Undo --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct UndoActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: UndoType, - pub(crate) actor: ObjectId, - pub(crate) object: serde_json::Value, -} - -#[async_trait::async_trait] -impl Activity for UndoActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - // The actor undoing must be the same as the actor in the wrapped activity. - if let Some(inner_actor) = self.object.get("actor").and_then(|v| v.as_str()) - && inner_actor != self.actor.inner().as_str() - { - return Err(Error::bad_request(anyhow::anyhow!( - "Undo actor does not match inner activity actor" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - if already_processed(&self.id, data).await { - return Ok(()); - } - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring Undo from blocked domain"); - return Ok(()); - } - - let obj_type = self - .object - .get("type") - .and_then(|t| t.as_str()) - .unwrap_or(""); - - match obj_type { - "Follow" => { - if let Some(obj_url) = self.object.get("object").and_then(|o| o.as_str()) - && let Ok(url) = Url::parse(obj_url) - && let Some(user_id) = crate::urls::extract_user_id_from_url(&url) - { - data.federation_repo - .remove_follower(user_id, self.actor.inner().as_str()) - .await?; - } - data.object_handler - .on_actor_removed(self.actor.inner()) - .await - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - tracing::info!(actor = %self.actor.inner(), "unfollowed"); - } - "Add" => { - let ap_id_str = self - .object - .get("object") - .and_then(|o| o.get("id")) - .and_then(|id| id.as_str()) - .or_else(|| self.object.get("id").and_then(|id| id.as_str())); - - if let Some(ap_id_str) = ap_id_str - && let Ok(ap_id) = Url::parse(ap_id_str) - { - data.object_handler - .on_delete(&ap_id, self.actor.inner()) - .await - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - tracing::info!(ap_id = %ap_id_str, "undo Add (watchlist remove)"); - } - } - "Like" => { - if let Some(obj_url_str) = self.object.get("object").and_then(|o| o.as_str()) - && let Ok(obj_url) = Url::parse(obj_url_str) - && obj_url.host_str().unwrap_or("") == data.domain - { - data.object_handler - .on_unlike(&obj_url, self.actor.inner()) - .await - .unwrap_or_else(|e| { - tracing::warn!(error = %e, "failed to process unlike"); - }); - } - tracing::info!(actor = %self.actor.inner(), "received Undo(Like)"); - } - other => { - tracing::debug!(kind = %other, "ignoring Undo of unknown activity type"); - } - } - - Ok(()) - } -} - -// --- Create --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct CreateActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: CreateType, - pub(crate) actor: ObjectId, - pub(crate) object: serde_json::Value, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) to: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) cc: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) bto: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) bcc: Vec, -} - -#[async_trait::async_trait] -impl Activity for CreateActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) - && let Ok(attributed_url) = Url::parse(attributed_to) - && &attributed_url != self.actor.inner() - { - return Err(Error::bad_request(anyhow::anyhow!( - "Create actor does not match object attributedTo" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - if already_processed(&self.id, data).await { - return Ok(()); - } - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - // Use the Note's own id, not the Create activity id (which ends in /activity). - // Delete activities reference the Note id, so they must match. - let ap_id = self - .object - .get("id") - .and_then(|v| v.as_str()) - .and_then(|s| Url::parse(s).ok()) - .unwrap_or_else(|| self.id.clone()); - let actor_url = self.actor.inner().clone(); - - // Extract Mention tags and notify local users. - extract_and_dispatch_mentions(&ap_id, &actor_url, &self.object, data).await; - - data.object_handler - .on_create(&ap_id, &actor_url, self.object) - .await - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - tracing::info!(actor = %actor_url, "received create activity"); - Ok(()) - } -} - -// --- Delete --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct DeleteActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: DeleteType, - pub(crate) actor: ObjectId, - pub(crate) object: serde_json::Value, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) to: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) cc: Vec, -} - -#[async_trait::async_trait] -impl Activity for DeleteActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - let actor_domain = self.actor.inner().host_str().unwrap_or(""); - let object_domain = match &self.object { - serde_json::Value::String(s) => Url::parse(s) - .ok() - .and_then(|u| u.host_str().map(|h| h.to_string())) - .unwrap_or_default(), - serde_json::Value::Object(o) => o - .get("id") - .and_then(|v| v.as_str()) - .and_then(|s| Url::parse(s).ok()) - .and_then(|u| u.host_str().map(|h| h.to_string())) - .unwrap_or_default(), - _ => String::new(), - }; - if !object_domain.is_empty() && actor_domain != object_domain { - return Err(Error::bad_request(anyhow::anyhow!( - "Delete actor domain does not match object domain" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - if already_processed(&self.id, data).await { - return Ok(()); - } - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - let actor_url = self.actor.inner().clone(); - - // Extract object URL — handles plain string and Tombstone {"id":"...","type":"Tombstone"} - let object_url_str = match &self.object { - serde_json::Value::String(s) => s.clone(), - serde_json::Value::Object(o) => o - .get("id") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()) - .unwrap_or_default(), - _ => String::new(), - }; - let Ok(object_url) = Url::parse(&object_url_str) else { - tracing::warn!(actor = %actor_url, "Delete activity has unparseable object, ignoring"); - return Ok(()); - }; - - // Actor self-deletion: Mastodon sends Delete(actor_url) when an account is deleted. - if object_url == *self.actor.inner() { - data.object_handler - .on_actor_removed(&actor_url) - .await - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - tracing::info!(actor = %actor_url, "received Delete(actor) — remote account deleted"); - return Ok(()); - } - - // Normal note deletion. - data.object_handler - .on_delete(&object_url, &actor_url) - .await - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - tracing::info!(object = %object_url, "received Delete(note)"); - Ok(()) - } -} - -// --- Update --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct UpdateActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: UpdateType, - pub(crate) actor: ObjectId, - pub(crate) object: serde_json::Value, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) to: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) cc: Vec, -} - -#[async_trait::async_trait] -impl Activity for UpdateActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) - && let Ok(attributed_url) = Url::parse(attributed_to) - && &attributed_url != self.actor.inner() - { - return Err(Error::bad_request(anyhow::anyhow!( - "Update actor does not match object attributedTo" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - if already_processed(&self.id, data).await { - return Ok(()); - } - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - let ap_id = self - .object - .get("id") - .and_then(|v| v.as_str()) - .and_then(|s| Url::parse(s).ok()) - .unwrap_or_else(|| self.id.clone()); - let actor_url = self.actor.inner().clone(); - - // Re-extract mentions on update so newly-added mentions are notified. - extract_and_dispatch_mentions(&ap_id, &actor_url, &self.object, data).await; - - data.object_handler - .on_update(&ap_id, &actor_url, self.object) - .await - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - tracing::info!(actor = %actor_url, "received update activity"); - Ok(()) - } -} - -// --- Announce --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct AnnounceActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: AnnounceType, - pub(crate) actor: ObjectId, - pub(crate) object: Url, - pub(crate) published: Option>, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) to: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) cc: Vec, -} - -#[async_trait::async_trait] -impl Activity for AnnounceActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - verify_domains_match(&self.id, self.actor.inner())?; - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - if already_processed(&self.id, data).await { - return Ok(()); - } - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - let object_domain = self.object.host_str().unwrap_or(""); - if object_domain != data.domain { - // Cross-server boost: notify the handler so consumers can surface it. - data.object_handler - .on_announce_of_remote(&self.object, self.actor.inner()) - .await - .unwrap_or_else(|e| { - tracing::warn!(error = %e, "failed to process cross-server announce"); - }); - tracing::debug!( - actor = %self.actor.inner(), - object = %self.object, - "received Announce of non-local object" - ); - return Ok(()); - } - data.federation_repo - .add_announce( - self.id.as_str(), - self.object.as_str(), - self.actor.inner().as_str(), - self.published.unwrap_or_else(chrono::Utc::now), - ) - .await?; - data.object_handler - .on_announce_received(&self.object, self.actor.inner()) - .await - .unwrap_or_else(|e| { - tracing::warn!(error = %e, "failed to process announce notification"); - }); - tracing::info!(actor = %self.actor.inner(), object = %self.object, "received announce"); - Ok(()) - } -} - -// --- Like --- - -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct LikeActivity { - pub id: Url, - #[serde(rename = "type")] - pub kind: LikeType, - pub actor: ObjectId, - pub object: Url, -} - -#[async_trait::async_trait] -impl Activity for LikeActivity { - type DataType = FederationData; - type Error = crate::error::Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - verify_domains_match(&self.id, self.actor.inner())?; - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - if already_processed(&self.id, data).await { - return Ok(()); - } - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring Like from blocked domain"); - return Ok(()); - } - - // Only process if the liked object is on our instance. - if self.object.host_str().unwrap_or("") != data.domain { - return Ok(()); - } - - data.object_handler - .on_like(&self.object, self.actor.inner()) - .await - .map_err(|e| crate::error::Error::from(anyhow::anyhow!(e)))?; - - tracing::info!(actor = %self.actor.inner(), object = %self.object, "received like"); - Ok(()) - } -} - -// --- Add --- - -#[derive(Clone, Default, Debug, Serialize, Deserialize)] -#[serde(rename = "Add")] -pub struct AddType; - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct AddActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: AddType, - pub(crate) actor: ObjectId, - pub(crate) object: serde_json::Value, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) to: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) cc: Vec, -} - -#[async_trait::async_trait] -impl Activity for AddActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) - && let Ok(attributed_url) = Url::parse(attributed_to) - && &attributed_url != self.actor.inner() - { - return Err(Error::bad_request(anyhow::anyhow!( - "Add actor does not match object attributedTo" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - if already_processed(&self.id, data).await { - return Ok(()); - } - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring Add from blocked domain"); - return Ok(()); - } - let ap_id = self.id.clone(); - let actor_url = self.actor.inner().clone(); - data.object_handler - .on_create(&ap_id, &actor_url, self.object) - .await - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - tracing::info!(actor = %actor_url, "received Add activity"); - Ok(()) - } -} - -// --- Block --- - -#[derive(Clone, Default, Debug, Serialize, Deserialize)] -#[serde(rename = "Block")] -pub struct BlockType; - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct BlockActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: BlockType, - pub(crate) actor: ObjectId, - pub(crate) object: Url, -} - -#[async_trait::async_trait] -impl Activity for BlockActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - verify_domains_match(&self.id, self.actor.inner())?; - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - if already_processed(&self.id, data).await { - return Ok(()); - } - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - if let Some(local_user_id) = crate::urls::extract_user_id_from_url(&self.object) { - let _ = data - .federation_repo - .remove_following(local_user_id, self.actor.inner().as_str()) - .await; - let _ = data - .federation_repo - .remove_follower(local_user_id, self.actor.inner().as_str()) - .await; - } - tracing::info!(actor = %self.actor.inner(), "received block — removed following and follower"); - Ok(()) - } -} - -// --- Move (account migration) --- - -#[derive(Clone, Default, Debug, Serialize, Deserialize)] -#[serde(rename = "Move")] -pub struct MoveType; - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct MoveActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: MoveType, - pub(crate) actor: ObjectId, - pub(crate) object: Url, - pub(crate) target: Url, -} - -#[async_trait::async_trait] -impl Activity for MoveActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - if &self.object != self.actor.inner() { - return Err(Error::bad_request(anyhow::anyhow!( - "Move object must be the actor itself" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - if already_processed(&self.id, data).await { - return Ok(()); - } - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - return Ok(()); - } - - // Fetch the target actor via signed request. - let target = ObjectId::::from(self.target.clone()) - .dereference(data) - .await - .map_err(|e| Error::from(anyhow::anyhow!("{e}")))?; - - // Verify the new actor claims the old identity via alsoKnownAs. - let old_url = self.object.as_str(); - if target.also_known_as.as_deref() != Some(old_url) { - return Err(Error::bad_request(anyhow::anyhow!( - "Move target alsoKnownAs does not reference old actor" - ))); - } - - // Migrate DB records; get user IDs that need a re-follow. - let affected = data - .federation_repo - .migrate_follower_actor(old_url, self.target.as_str()) - .await - .map_err(|e| Error::from(anyhow::anyhow!("{e}")))?; - - let affected_count = affected.len(); - - // Re-follow on behalf of each affected local user. - for local_user_id in &affected { - let local_actor = match crate::actors::get_local_actor(*local_user_id, data).await { - Ok(a) => a, - Err(e) => { - tracing::warn!(error = %e, %local_user_id, "Move: failed to load local actor for re-follow"); - continue; - } - }; - - let follow_id = match crate::urls::activity_url(&data.base_url) { - Ok(u) => u, - Err(e) => { - tracing::warn!(error = %e, "Move: failed to generate follow activity URL"); - continue; - } - }; - - let follow = FollowActivity { - id: follow_id, - kind: Default::default(), - actor: activitypub_federation::fetch::object_id::ObjectId::from( - local_actor.ap_id.clone(), - ), - object: activitypub_federation::fetch::object_id::ObjectId::from( - self.target.clone(), - ), - }; - - let sends = match activitypub_federation::activity_sending::SendActivityTask::prepare( - &activitypub_federation::protocol::context::WithContext::new_default(follow), - &local_actor, - vec![target.inbox_url.clone()], - data, - ) - .await - { - Ok(s) => s, - Err(e) => { - tracing::warn!(error = %e, "Move: failed to prepare re-follow"); - continue; - } - }; - - for send in sends { - if let Err(e) = send.sign_and_send(data).await { - tracing::warn!(error = %e, %local_user_id, "Move: re-follow delivery failed"); - } - } - } - - tracing::info!( - actor = %self.actor.inner(), - target = %self.target, - affected = affected_count, - "received Move — migrated follower relationships" - ); - Ok(()) - } -} - -// --- Idempotency guard --- - -/// Returns `true` if the activity was already processed (caller should return `Ok(())`). -/// Marks the activity as processed before returning `false`. -/// On any repository error the check is skipped to avoid silently dropping activities. -async fn already_processed(activity_id: &Url, data: &Data) -> bool { - let id = activity_id.as_str(); - match data.federation_repo.is_activity_processed(id).await { - Ok(true) => { - tracing::debug!(activity_id = id, "duplicate activity, skipping"); - return true; - } - Ok(false) => { - if let Err(e) = data.federation_repo.mark_activity_processed(id).await { - tracing::warn!(activity_id = id, error = %e, "failed to mark activity processed"); - } - } - Err(e) => { - tracing::warn!(error = %e, "idempotency check failed, processing anyway"); - } - } - false -} - -// --- Mention extraction --- - -/// Parse `object["tag"]` for Mention entries and call `on_mention` for each -/// local user that is tagged. Failures are logged but never propagated — a -/// broken mention notification must not fail the entire activity. -async fn extract_and_dispatch_mentions( - ap_id: &Url, - actor_url: &Url, - object: &serde_json::Value, - data: &Data, -) { - let tags = match object.get("tag").and_then(|t| t.as_array()) { - Some(t) => t, - None => return, - }; - for tag in tags { - let tag_type = tag.get("type").and_then(|v| v.as_str()).unwrap_or(""); - if tag_type != "Mention" { - continue; - } - let href = match tag.get("href").and_then(|v| v.as_str()) { - Some(h) => h, - None => continue, - }; - let Ok(href_url) = Url::parse(href) else { continue }; - - // Only dispatch for local actors. - let Some(mentioned_user_id) = crate::urls::extract_user_id_from_url(&href_url) else { - continue; - }; - - if let Err(e) = data - .object_handler - .on_mention(ap_id, mentioned_user_id, actor_url) - .await - { - tracing::warn!( - ap_id = %ap_id, - mentioned_user = %mentioned_user_id, - error = %e, - "failed to dispatch mention notification" - ); - } - } -} - -// --- Inbox dispatch enum --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(tag = "type")] -#[enum_delegate::implement(Activity)] -pub enum InboxActivities { - #[serde(rename = "Follow")] - Follow(FollowActivity), - #[serde(rename = "Accept")] - Accept(AcceptActivity), - #[serde(rename = "Reject")] - Reject(RejectActivity), - #[serde(rename = "Undo")] - Undo(UndoActivity), - #[serde(rename = "Create")] - Create(CreateActivity), - #[serde(rename = "Delete")] - Delete(DeleteActivity), - #[serde(rename = "Update")] - Update(UpdateActivity), - #[serde(rename = "Announce")] - Announce(AnnounceActivity), - #[serde(rename = "Add")] - Add(AddActivity), - #[serde(rename = "Block")] - Block(BlockActivity), - #[serde(rename = "Like")] - Like(LikeActivity), - #[serde(rename = "Move")] - Move(MoveActivity), -} diff --git a/src/activities/accept.rs b/src/activities/accept.rs new file mode 100644 index 0000000..1d3b9d5 --- /dev/null +++ b/src/activities/accept.rs @@ -0,0 +1,59 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + kinds::activity::AcceptType, + traits::Activity, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; +use crate::repository::FollowingStatus; + +use super::follow::FollowActivity; +use super::helpers::check_guards; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AcceptActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: AcceptType, + pub(crate) actor: ObjectId, + pub(crate) object: FollowActivity, +} + +#[async_trait::async_trait] +impl Activity for AcceptActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { &self.id } + fn actor(&self) -> &Url { self.actor.inner() } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + if self.actor.inner() != self.object.object.inner() { + return Err(Error::bad_request(anyhow::anyhow!("Accept actor does not match Follow target"))); + } + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if check_guards(&self.id, self.actor.inner(), data).await? { + return Ok(()); + } + let local_user_id = crate::urls::extract_user_id_from_url(self.object.actor.inner()) + .ok_or_else(|| Error::bad_request(anyhow::anyhow!("invalid actor URL in Follow")))?; + data.federation_repo + .update_following_status( + local_user_id, + self.actor.inner().as_str(), + FollowingStatus::Accepted, + ) + .await?; + tracing::info!(remote_actor = %self.actor.inner(), "follow accepted by remote"); + Ok(()) + } +} diff --git a/src/activities/add.rs b/src/activities/add.rs new file mode 100644 index 0000000..0704e96 --- /dev/null +++ b/src/activities/add.rs @@ -0,0 +1,66 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + traits::Activity, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; + +use super::helpers::check_guards; + +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[serde(rename = "Add")] +pub struct AddType; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AddActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: AddType, + pub(crate) actor: ObjectId, + pub(crate) object: serde_json::Value, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) to: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) cc: Vec, +} + +#[async_trait::async_trait] +impl Activity for AddActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { &self.id } + fn actor(&self) -> &Url { self.actor.inner() } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) + && let Ok(attributed_url) = Url::parse(attributed_to) + && &attributed_url != self.actor.inner() + { + return Err(Error::bad_request(anyhow::anyhow!( + "Add actor does not match object attributedTo" + ))); + } + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if check_guards(&self.id, self.actor.inner(), data).await? { + return Ok(()); + } + let ap_id = self.id.clone(); + let actor_url = self.actor.inner().clone(); + data.object_handler + .on_create(&ap_id, &actor_url, self.object) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(actor = %actor_url, "received Add activity"); + Ok(()) + } +} diff --git a/src/activities/announce.rs b/src/activities/announce.rs new file mode 100644 index 0000000..e822035 --- /dev/null +++ b/src/activities/announce.rs @@ -0,0 +1,75 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + protocol::verification::verify_domains_match, + traits::Activity, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; + +use super::helpers::check_guards; + +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[serde(rename = "Announce")] +pub struct AnnounceType; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AnnounceActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: AnnounceType, + pub(crate) actor: ObjectId, + pub(crate) object: Url, + pub(crate) published: Option>, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) to: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) cc: Vec, +} + +#[async_trait::async_trait] +impl Activity for AnnounceActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { &self.id } + fn actor(&self) -> &Url { self.actor.inner() } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + verify_domains_match(&self.id, self.actor.inner())?; + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if check_guards(&self.id, self.actor.inner(), data).await? { + return Ok(()); + } + if self.object.host_str().unwrap_or("") != data.domain { + data.object_handler + .on_announce_of_remote(&self.object, self.actor.inner()) + .await + .unwrap_or_else(|e| tracing::warn!(error = %e, "failed to process cross-server announce")); + tracing::debug!(actor = %self.actor.inner(), object = %self.object, "received Announce of non-local object"); + return Ok(()); + } + data.federation_repo + .add_announce( + self.id.as_str(), + self.object.as_str(), + self.actor.inner().as_str(), + self.published.unwrap_or_else(chrono::Utc::now), + ) + .await?; + data.object_handler + .on_announce_received(&self.object, self.actor.inner()) + .await + .unwrap_or_else(|e| tracing::warn!(error = %e, "failed to process announce notification")); + tracing::info!(actor = %self.actor.inner(), object = %self.object, "received announce"); + Ok(()) + } +} diff --git a/src/activities/block.rs b/src/activities/block.rs new file mode 100644 index 0000000..2fb355e --- /dev/null +++ b/src/activities/block.rs @@ -0,0 +1,54 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + protocol::verification::verify_domains_match, + traits::Activity, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; + +use super::helpers::check_guards; + +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[serde(rename = "Block")] +pub struct BlockType; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BlockActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: BlockType, + pub(crate) actor: ObjectId, + pub(crate) object: Url, +} + +#[async_trait::async_trait] +impl Activity for BlockActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { &self.id } + fn actor(&self) -> &Url { self.actor.inner() } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + verify_domains_match(&self.id, self.actor.inner())?; + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if check_guards(&self.id, self.actor.inner(), data).await? { + return Ok(()); + } + if let Some(local_user_id) = crate::urls::extract_user_id_from_url(&self.object) { + let _ = data.federation_repo.remove_following(local_user_id, self.actor.inner().as_str()).await; + let _ = data.federation_repo.remove_follower(local_user_id, self.actor.inner().as_str()).await; + } + tracing::info!(actor = %self.actor.inner(), "received block — removed following and follower"); + Ok(()) + } +} diff --git a/src/activities/create.rs b/src/activities/create.rs new file mode 100644 index 0000000..9fcc132 --- /dev/null +++ b/src/activities/create.rs @@ -0,0 +1,73 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + kinds::activity::CreateType, + traits::Activity, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; + +use super::helpers::{check_guards, extract_and_dispatch_mentions}; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: CreateType, + pub(crate) actor: ObjectId, + pub(crate) object: serde_json::Value, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) to: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) cc: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) bto: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) bcc: Vec, +} + +#[async_trait::async_trait] +impl Activity for CreateActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { &self.id } + fn actor(&self) -> &Url { self.actor.inner() } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) + && let Ok(attributed_url) = Url::parse(attributed_to) + && &attributed_url != self.actor.inner() + { + return Err(Error::bad_request(anyhow::anyhow!( + "Create actor does not match object attributedTo" + ))); + } + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if check_guards(&self.id, self.actor.inner(), data).await? { + return Ok(()); + } + let ap_id = self + .object + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| Url::parse(s).ok()) + .unwrap_or_else(|| self.id.clone()); + let actor_url = self.actor.inner().clone(); + extract_and_dispatch_mentions(&ap_id, &actor_url, &self.object, data).await; + data.object_handler + .on_create(&ap_id, &actor_url, self.object) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(actor = %actor_url, "received create activity"); + Ok(()) + } +} diff --git a/src/activities/delete.rs b/src/activities/delete.rs new file mode 100644 index 0000000..091c8c1 --- /dev/null +++ b/src/activities/delete.rs @@ -0,0 +1,94 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + kinds::activity::DeleteType, + traits::Activity, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; + +use super::helpers::check_guards; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DeleteActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: DeleteType, + pub(crate) actor: ObjectId, + pub(crate) object: serde_json::Value, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) to: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) cc: Vec, +} + +#[async_trait::async_trait] +impl Activity for DeleteActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { &self.id } + fn actor(&self) -> &Url { self.actor.inner() } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + let actor_domain = self.actor.inner().host_str().unwrap_or(""); + let object_domain = match &self.object { + serde_json::Value::String(s) => Url::parse(s) + .ok() + .and_then(|u| u.host_str().map(|h| h.to_string())) + .unwrap_or_default(), + serde_json::Value::Object(o) => o + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| Url::parse(s).ok()) + .and_then(|u| u.host_str().map(|h| h.to_string())) + .unwrap_or_default(), + _ => String::new(), + }; + if !object_domain.is_empty() && actor_domain != object_domain { + return Err(Error::bad_request(anyhow::anyhow!( + "Delete actor domain does not match object domain" + ))); + } + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if check_guards(&self.id, self.actor.inner(), data).await? { + return Ok(()); + } + let actor_url = self.actor.inner().clone(); + let object_url_str = match &self.object { + serde_json::Value::String(s) => s.clone(), + serde_json::Value::Object(o) => o + .get("id") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .unwrap_or_default(), + _ => String::new(), + }; + let Ok(object_url) = Url::parse(&object_url_str) else { + tracing::warn!(actor = %actor_url, "Delete has unparseable object, ignoring"); + return Ok(()); + }; + if object_url == *self.actor.inner() { + data.object_handler + .on_actor_removed(&actor_url) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(actor = %actor_url, "received Delete(actor) — remote account deleted"); + return Ok(()); + } + data.object_handler + .on_delete(&object_url, &actor_url) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(object = %object_url, "received Delete(note)"); + Ok(()) + } +} diff --git a/src/activities/follow.rs b/src/activities/follow.rs new file mode 100644 index 0000000..c073b92 --- /dev/null +++ b/src/activities/follow.rs @@ -0,0 +1,85 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + kinds::activity::FollowType, + traits::Activity, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; +use crate::repository::FollowerStatus; + +use super::helpers::check_guards; + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct FollowActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: FollowType, + pub(crate) actor: ObjectId, + pub(crate) object: ObjectId, +} + +#[async_trait::async_trait] +impl Activity for FollowActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { &self.id } + fn actor(&self) -> &Url { self.actor.inner() } + + async fn verify(&self, data: &Data) -> Result<(), Self::Error> { + let target_url = self.object.inner(); + let target_domain = match (target_url.host_str(), target_url.port()) { + (Some(host), Some(port)) => format!("{}:{}", host, port), + (Some(host), None) => host.to_string(), + _ => return Err(Error::bad_request(anyhow::anyhow!("invalid follow target URL"))), + }; + if target_domain == data.domain { + return Ok(()); + } + if let Some(uuid) = crate::urls::extract_user_id_from_url(target_url) { + if data.user_repo.find_by_id(uuid).await.ok().flatten().is_some() { + tracing::debug!(target = %target_url, "accepting follow for migrated actor URL"); + return Ok(()); + } + } + Err(Error::bad_request(anyhow::anyhow!("follow target is not a local actor"))) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if check_guards(&self.id, self.actor.inner(), data).await? { + return Ok(()); + } + // Actor block checked BEFORE any outbound HTTP fetch. + if let Some(target_user_id) = crate::urls::extract_user_id_from_url(self.object.inner()) { + if data.federation_repo + .is_actor_blocked(target_user_id, self.actor.inner().as_str()) + .await? + { + tracing::info!(actor = %self.actor.inner(), "ignoring follow from blocked actor"); + return Ok(()); + } + } + let _follower = self.actor.dereference(data).await?; + let local_actor = self.object.dereference(data).await?; + data.federation_repo + .add_follower( + local_actor.user_id, + self.actor.inner().as_str(), + FollowerStatus::Pending, + self.id.as_str(), + ) + .await?; + tracing::info!( + follower = %self.actor.inner(), + local_user = %local_actor.user_id, + "follow request pending approval" + ); + Ok(()) + } +} diff --git a/src/activities/helpers.rs b/src/activities/helpers.rs new file mode 100644 index 0000000..8347be5 --- /dev/null +++ b/src/activities/helpers.rs @@ -0,0 +1,84 @@ +use activitypub_federation::config::Data; +use url::Url; + +use crate::data::FederationData; +use crate::error::Error; + +/// Returns `true` if the activity was already processed. +/// Marks it processed before returning `false`. +/// On repo error, skips the check rather than silently dropping the activity. +pub(crate) async fn already_processed(activity_id: &Url, data: &Data) -> bool { + let id = activity_id.as_str(); + match data.federation_repo.is_activity_processed(id).await { + Ok(true) => { + tracing::debug!(activity_id = id, "duplicate activity, skipping"); + true + } + Ok(false) => { + if let Err(e) = data.federation_repo.mark_activity_processed(id).await { + tracing::warn!(activity_id = id, error = %e, "failed to mark activity processed"); + } + false + } + Err(e) => { + tracing::warn!(error = %e, "idempotency check failed, processing anyway"); + false + } + } +} + +/// Returns `true` when the activity should be skipped: +/// already processed, or the sender's domain is blocked. +/// Call this at the top of every `receive()` impl. +pub(crate) async fn check_guards( + id: &Url, + actor: &Url, + data: &Data, +) -> Result { + if already_processed(id, data).await { + return Ok(true); + } + let domain = actor.host_str().unwrap_or(""); + if data.federation_repo.is_domain_blocked(domain).await? { + tracing::info!(actor = %actor, "ignoring activity from blocked domain"); + return Ok(true); + } + Ok(false) +} + +/// Parse `object["tag"]` for `Mention` entries and notify each tagged local user. +/// Failures are logged and never propagated — a broken mention must not fail the activity. +pub(crate) async fn extract_and_dispatch_mentions( + ap_id: &Url, + actor_url: &Url, + object: &serde_json::Value, + data: &Data, +) { + let Some(tags) = object.get("tag").and_then(|t| t.as_array()) else { + return; + }; + for tag in tags { + if tag.get("type").and_then(|v| v.as_str()) != Some("Mention") { + continue; + } + let Some(href) = tag.get("href").and_then(|v| v.as_str()) else { + continue; + }; + let Ok(href_url) = Url::parse(href) else { continue }; + let Some(mentioned_user_id) = crate::urls::extract_user_id_from_url(&href_url) else { + continue; + }; + if let Err(e) = data + .object_handler + .on_mention(ap_id, mentioned_user_id, actor_url) + .await + { + tracing::warn!( + ap_id = %ap_id, + mentioned_user = %mentioned_user_id, + error = %e, + "failed to dispatch mention notification" + ); + } + } +} diff --git a/src/activities/like.rs b/src/activities/like.rs new file mode 100644 index 0000000..2f47334 --- /dev/null +++ b/src/activities/like.rs @@ -0,0 +1,61 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + protocol::verification::verify_domains_match, + traits::Activity, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; + +use super::helpers::check_guards; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename = "Like")] +pub struct LikeType; + +impl Default for LikeType { + fn default() -> Self { Self } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct LikeActivity { + pub id: Url, + #[serde(rename = "type")] + pub kind: LikeType, + pub actor: ObjectId, + pub object: Url, +} + +#[async_trait::async_trait] +impl Activity for LikeActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { &self.id } + fn actor(&self) -> &Url { self.actor.inner() } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + verify_domains_match(&self.id, self.actor.inner())?; + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if check_guards(&self.id, self.actor.inner(), data).await? { + return Ok(()); + } + if self.object.host_str().unwrap_or("") != data.domain { + return Ok(()); + } + data.object_handler + .on_like(&self.object, self.actor.inner()) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(actor = %self.actor.inner(), object = %self.object, "received like"); + Ok(()) + } +} diff --git a/src/activities/mod.rs b/src/activities/mod.rs new file mode 100644 index 0000000..f75217f --- /dev/null +++ b/src/activities/mod.rs @@ -0,0 +1,60 @@ +mod accept; +mod add; +mod announce; +mod block; +mod create; +mod delete; +mod follow; +pub(crate) mod helpers; +mod like; +mod move_act; +mod reject; +mod undo; +mod update; + +pub use accept::AcceptActivity; +pub use add::{AddActivity, AddType}; +pub use announce::{AnnounceActivity, AnnounceType}; +pub use block::{BlockActivity, BlockType}; +pub use create::CreateActivity; +pub use delete::DeleteActivity; +pub use follow::FollowActivity; +pub use like::{LikeActivity, LikeType}; +pub use move_act::{MoveActivity, MoveType}; +pub use reject::RejectActivity; +pub use undo::UndoActivity; +pub use update::UpdateActivity; + +use activitypub_federation::config::Data; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(tag = "type")] +#[enum_delegate::implement(activitypub_federation::traits::Activity)] +pub enum InboxActivities { + #[serde(rename = "Follow")] + Follow(FollowActivity), + #[serde(rename = "Accept")] + Accept(AcceptActivity), + #[serde(rename = "Reject")] + Reject(RejectActivity), + #[serde(rename = "Undo")] + Undo(UndoActivity), + #[serde(rename = "Create")] + Create(CreateActivity), + #[serde(rename = "Delete")] + Delete(DeleteActivity), + #[serde(rename = "Update")] + Update(UpdateActivity), + #[serde(rename = "Announce")] + Announce(AnnounceActivity), + #[serde(rename = "Add")] + Add(AddActivity), + #[serde(rename = "Block")] + Block(BlockActivity), + #[serde(rename = "Like")] + Like(LikeActivity), + #[serde(rename = "Move")] + Move(MoveActivity), +} diff --git a/src/activities/move_act.rs b/src/activities/move_act.rs new file mode 100644 index 0000000..29ca4fd --- /dev/null +++ b/src/activities/move_act.rs @@ -0,0 +1,105 @@ +use activitypub_federation::{ + activity_sending::SendActivityTask, + config::Data, + fetch::object_id::ObjectId, + protocol::context::WithContext, + traits::Activity, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; + +use super::follow::FollowActivity; +use super::helpers::check_guards; + +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[serde(rename = "Move")] +pub struct MoveType; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct MoveActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: MoveType, + pub(crate) actor: ObjectId, + pub(crate) object: Url, + pub(crate) target: Url, +} + +#[async_trait::async_trait] +impl Activity for MoveActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { &self.id } + fn actor(&self) -> &Url { self.actor.inner() } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + if &self.object != self.actor.inner() { + return Err(Error::bad_request(anyhow::anyhow!("Move object must be the actor itself"))); + } + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if check_guards(&self.id, self.actor.inner(), data).await? { + return Ok(()); + } + let target = ObjectId::::from(self.target.clone()) + .dereference(data) + .await + .map_err(|e| Error::from(anyhow::anyhow!("{e}")))?; + if target.also_known_as.as_deref() != Some(self.object.as_str()) { + return Err(Error::bad_request(anyhow::anyhow!( + "Move target alsoKnownAs does not reference old actor" + ))); + } + let affected = data + .federation_repo + .migrate_follower_actor(self.object.as_str(), self.target.as_str()) + .await + .map_err(|e| Error::from(anyhow::anyhow!("{e}")))?; + let affected_count = affected.len(); + for local_user_id in &affected { + let local_actor = match crate::actors::get_local_actor(*local_user_id, data).await { + Ok(a) => a, + Err(e) => { tracing::warn!(error = %e, %local_user_id, "Move: failed to load local actor"); continue; } + }; + let follow_id = match crate::urls::activity_url(&data.base_url) { + Ok(u) => u, + Err(e) => { tracing::warn!(error = %e, "Move: failed to generate follow activity URL"); continue; } + }; + let follow = FollowActivity { + id: follow_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: ObjectId::from(self.target.clone()), + }; + let sends = match SendActivityTask::prepare( + &WithContext::new_default(follow), + &local_actor, + vec![target.inbox_url.clone()], + data, + ).await { + Ok(s) => s, + Err(e) => { tracing::warn!(error = %e, "Move: failed to prepare re-follow"); continue; } + }; + for send in sends { + if let Err(e) = send.sign_and_send(data).await { + tracing::warn!(error = %e, %local_user_id, "Move: re-follow delivery failed"); + } + } + } + tracing::info!( + actor = %self.actor.inner(), + target = %self.target, + affected = affected_count, + "received Move — migrated follower relationships" + ); + Ok(()) + } +} diff --git a/src/activities/reject.rs b/src/activities/reject.rs new file mode 100644 index 0000000..7c49f20 --- /dev/null +++ b/src/activities/reject.rs @@ -0,0 +1,54 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + kinds::activity::RejectType, + traits::Activity, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; + +use super::follow::FollowActivity; +use super::helpers::check_guards; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct RejectActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: RejectType, + pub(crate) actor: ObjectId, + pub(crate) object: FollowActivity, +} + +#[async_trait::async_trait] +impl Activity for RejectActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { &self.id } + fn actor(&self) -> &Url { self.actor.inner() } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + if self.actor.inner() != self.object.object.inner() { + return Err(Error::bad_request(anyhow::anyhow!("Reject actor does not match Follow target"))); + } + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if check_guards(&self.id, self.actor.inner(), data).await? { + return Ok(()); + } + if let Some(user_id) = crate::urls::extract_user_id_from_url(self.object.actor.inner()) { + data.federation_repo + .remove_following(user_id, self.actor.inner().as_str()) + .await?; + } + tracing::info!(actor = %self.actor.inner(), "follow rejected"); + Ok(()) + } +} diff --git a/src/activities/undo.rs b/src/activities/undo.rs new file mode 100644 index 0000000..fd4ff9c --- /dev/null +++ b/src/activities/undo.rs @@ -0,0 +1,101 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + kinds::activity::UndoType, + traits::Activity, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; + +use super::helpers::check_guards; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UndoActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: UndoType, + pub(crate) actor: ObjectId, + pub(crate) object: serde_json::Value, +} + +#[async_trait::async_trait] +impl Activity for UndoActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { &self.id } + fn actor(&self) -> &Url { self.actor.inner() } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + if let Some(inner_actor) = self.object.get("actor").and_then(|v| v.as_str()) + && inner_actor != self.actor.inner().as_str() + { + return Err(Error::bad_request(anyhow::anyhow!( + "Undo actor does not match inner activity actor" + ))); + } + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if check_guards(&self.id, self.actor.inner(), data).await? { + return Ok(()); + } + let obj_type = self.object.get("type").and_then(|t| t.as_str()).unwrap_or(""); + match obj_type { + "Follow" => { + if let Some(obj_url) = self.object.get("object").and_then(|o| o.as_str()) + && let Ok(url) = Url::parse(obj_url) + && let Some(user_id) = crate::urls::extract_user_id_from_url(&url) + { + data.federation_repo + .remove_follower(user_id, self.actor.inner().as_str()) + .await?; + } + data.object_handler + .on_actor_removed(self.actor.inner()) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(actor = %self.actor.inner(), "unfollowed"); + } + "Add" => { + let ap_id_str = self + .object + .get("object") + .and_then(|o| o.get("id")) + .and_then(|id| id.as_str()) + .or_else(|| self.object.get("id").and_then(|id| id.as_str())); + if let Some(ap_id_str) = ap_id_str + && let Ok(ap_id) = Url::parse(ap_id_str) + { + data.object_handler + .on_delete(&ap_id, self.actor.inner()) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(ap_id = %ap_id_str, "undo Add (watchlist remove)"); + } + } + "Like" => { + if let Some(obj_url_str) = self.object.get("object").and_then(|o| o.as_str()) + && let Ok(obj_url) = Url::parse(obj_url_str) + && obj_url.host_str().unwrap_or("") == data.domain + { + data.object_handler + .on_unlike(&obj_url, self.actor.inner()) + .await + .unwrap_or_else(|e| tracing::warn!(error = %e, "failed to process unlike")); + } + tracing::info!(actor = %self.actor.inner(), "received Undo(Like)"); + } + other => { + tracing::debug!(kind = %other, "ignoring Undo of unknown activity type"); + } + } + Ok(()) + } +} diff --git a/src/activities/update.rs b/src/activities/update.rs new file mode 100644 index 0000000..c35d1dc --- /dev/null +++ b/src/activities/update.rs @@ -0,0 +1,69 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + kinds::activity::UpdateType, + traits::Activity, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; + +use super::helpers::{check_guards, extract_and_dispatch_mentions}; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: UpdateType, + pub(crate) actor: ObjectId, + pub(crate) object: serde_json::Value, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) to: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) cc: Vec, +} + +#[async_trait::async_trait] +impl Activity for UpdateActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { &self.id } + fn actor(&self) -> &Url { self.actor.inner() } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) + && let Ok(attributed_url) = Url::parse(attributed_to) + && &attributed_url != self.actor.inner() + { + return Err(Error::bad_request(anyhow::anyhow!( + "Update actor does not match object attributedTo" + ))); + } + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if check_guards(&self.id, self.actor.inner(), data).await? { + return Ok(()); + } + let ap_id = self + .object + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| Url::parse(s).ok()) + .unwrap_or_else(|| self.id.clone()); + let actor_url = self.actor.inner().clone(); + extract_and_dispatch_mentions(&ap_id, &actor_url, &self.object, data).await; + data.object_handler + .on_update(&ap_id, &actor_url, self.object) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(actor = %actor_url, "received update activity"); + Ok(()) + } +}