diff --git a/crates/adapters/activitypub-base/Cargo.toml b/crates/adapters/activitypub-base/Cargo.toml deleted file mode 100644 index 3efc249..0000000 --- a/crates/adapters/activitypub-base/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -name = "activitypub-base" -version = "0.1.0" -edition = "2024" - -[dependencies] -tokio = { workspace = true } -futures = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -uuid = { workspace = true } -chrono = { workspace = true } -anyhow = { workspace = true } -tracing = { workspace = true } -async-trait = { workspace = true } -axum = { workspace = true } -reqwest = { workspace = true } -url = { workspace = true } -domain = { workspace = true } - -activitypub_federation = "0.7.0-beta.11" -enum_delegate = "0.2" diff --git a/crates/adapters/activitypub-base/src/activities.rs b/crates/adapters/activitypub-base/src/activities.rs deleted file mode 100644 index 88e2e2f..0000000 --- a/crates/adapters/activitypub-base/src/activities.rs +++ /dev/null @@ -1,871 +0,0 @@ -use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - kinds::activity::{ - AcceptType, CreateType, DeleteType, FollowType, RejectType, UndoType, UpdateType, - }, - protocol::verification::verify_domains_match, - traits::Activity, -}; -use serde::{Deserialize, Serialize}; -use url::Url; - -#[derive(Clone, Default, Debug, Serialize, Deserialize)] -#[serde(rename = "Announce")] -pub struct AnnounceType; - -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(rename = "Like")] -pub struct LikeType; - -impl Default for LikeType { - fn default() -> Self { - Self - } -} - -use crate::actors::DbActor; -use crate::data::FederationData; -use crate::error::Error; -use crate::repository::{FollowerStatus, FollowingStatus}; - -// --- Follow --- - -#[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct FollowActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: FollowType, - pub(crate) actor: ObjectId, - pub(crate) object: ObjectId, -} - -#[async_trait::async_trait] -impl Activity for FollowActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, data: &Data) -> Result<(), Self::Error> { - let target_url = self.object.inner(); - let target_domain = match (target_url.host_str(), target_url.port()) { - (Some(host), Some(port)) => format!("{}:{}", host, port), - (Some(host), None) => host.to_string(), - _ => { - return Err(Error::bad_request(anyhow::anyhow!( - "invalid follow target URL" - ))); - } - }; - if target_domain != data.domain { - return Err(Error::bad_request(anyhow::anyhow!( - "follow target is not a local actor" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - let _follower = self.actor.dereference(data).await?; - let local_actor = self.object.dereference(data).await?; - - if data - .federation_repo - .is_actor_blocked(local_actor.user_id, self.actor.inner().as_str()) - .await? - { - tracing::info!(actor = %self.actor.inner(), "ignoring follow from blocked actor"); - return Ok(()); - } - - data.federation_repo - .add_follower( - local_actor.user_id, - self.actor.inner().as_str(), - FollowerStatus::Pending, - self.id.as_str(), - ) - .await?; - - tracing::info!( - follower = %self.actor.inner(), - local_user = %local_actor.user_id, - "follow request pending approval" - ); - Ok(()) - } -} - -// --- Accept --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct AcceptActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: AcceptType, - pub(crate) actor: ObjectId, - pub(crate) object: FollowActivity, -} - -#[async_trait::async_trait] -impl Activity for AcceptActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - if self.actor.inner() != self.object.object.inner() { - return Err(Error::bad_request(anyhow::anyhow!( - "Accept actor does not match Follow target" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - let local_user_id = crate::urls::extract_user_id_from_url(self.object.actor.inner()) - .ok_or_else(|| Error::bad_request(anyhow::anyhow!("invalid actor URL in Follow")))?; - data.federation_repo - .update_following_status( - local_user_id, - self.actor.inner().as_str(), - FollowingStatus::Accepted, - ) - .await?; - - tracing::info!(remote_actor = %self.actor.inner(), "follow accepted by remote"); - Ok(()) - } -} - -// --- Reject --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct RejectActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: RejectType, - pub(crate) actor: ObjectId, - pub(crate) object: FollowActivity, -} - -#[async_trait::async_trait] -impl Activity for RejectActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - if self.actor.inner() != self.object.object.inner() { - return Err(Error::bad_request(anyhow::anyhow!( - "Reject actor does not match Follow target" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - if let Some(user_id) = crate::urls::extract_user_id_from_url(self.object.actor.inner()) { - data.federation_repo - .remove_following(user_id, self.actor.inner().as_str()) - .await?; - } - tracing::info!(actor = %self.actor.inner(), "follow rejected"); - Ok(()) - } -} - -// --- Undo --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct UndoActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: UndoType, - pub(crate) actor: ObjectId, - pub(crate) object: serde_json::Value, -} - -#[async_trait::async_trait] -impl Activity for UndoActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - // The actor undoing must be the same as the actor in the wrapped activity. - if let Some(inner_actor) = self.object.get("actor").and_then(|v| v.as_str()) - && inner_actor != self.actor.inner().as_str() - { - return Err(Error::bad_request(anyhow::anyhow!( - "Undo actor does not match inner activity actor" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring Undo from blocked domain"); - return Ok(()); - } - - let obj_type = self - .object - .get("type") - .and_then(|t| t.as_str()) - .unwrap_or(""); - - match obj_type { - "Follow" => { - if let Some(obj_url) = self.object.get("object").and_then(|o| o.as_str()) - && let Ok(url) = Url::parse(obj_url) - && let Some(user_id) = crate::urls::extract_user_id_from_url(&url) - { - data.federation_repo - .remove_follower(user_id, self.actor.inner().as_str()) - .await?; - } - data.object_handler - .on_actor_removed(self.actor.inner()) - .await - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - tracing::info!(actor = %self.actor.inner(), "unfollowed"); - } - "Add" => { - let ap_id_str = self - .object - .get("object") - .and_then(|o| o.get("id")) - .and_then(|id| id.as_str()) - .or_else(|| self.object.get("id").and_then(|id| id.as_str())); - - if let Some(ap_id_str) = ap_id_str - && let Ok(ap_id) = Url::parse(ap_id_str) - { - data.object_handler - .on_delete(&ap_id, self.actor.inner()) - .await - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - tracing::info!(ap_id = %ap_id_str, "undo Add (watchlist remove)"); - } - } - "Like" => { - if let Some(obj_url_str) = self.object.get("object").and_then(|o| o.as_str()) - && let Ok(obj_url) = Url::parse(obj_url_str) - && obj_url.host_str().unwrap_or("") == data.domain - { - data.object_handler - .on_unlike(&obj_url, self.actor.inner()) - .await - .unwrap_or_else(|e| { - tracing::warn!(error = %e, "failed to process unlike"); - }); - } - tracing::info!(actor = %self.actor.inner(), "received Undo(Like)"); - } - other => { - tracing::debug!(kind = %other, "ignoring Undo of unknown activity type"); - } - } - - Ok(()) - } -} - -// --- Create --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct CreateActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: CreateType, - pub(crate) actor: ObjectId, - pub(crate) object: serde_json::Value, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) to: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) cc: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) bto: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) bcc: Vec, -} - -#[async_trait::async_trait] -impl Activity for CreateActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) - && let Ok(attributed_url) = Url::parse(attributed_to) - && &attributed_url != self.actor.inner() - { - return Err(Error::bad_request(anyhow::anyhow!( - "Create actor does not match object attributedTo" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - // Use the Note's own id, not the Create activity id (which ends in /activity). - // Delete activities reference the Note id, so they must match. - let ap_id = self - .object - .get("id") - .and_then(|v| v.as_str()) - .and_then(|s| Url::parse(s).ok()) - .unwrap_or_else(|| self.id.clone()); - let actor_url = self.actor.inner().clone(); - data.object_handler - .on_create(&ap_id, &actor_url, self.object) - .await - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - tracing::info!(actor = %actor_url, "received create activity"); - Ok(()) - } -} - -// --- Delete --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct DeleteActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: DeleteType, - pub(crate) actor: ObjectId, - pub(crate) object: serde_json::Value, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) to: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) cc: Vec, -} - -#[async_trait::async_trait] -impl Activity for DeleteActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - let actor_domain = self.actor.inner().host_str().unwrap_or(""); - let object_domain = match &self.object { - serde_json::Value::String(s) => Url::parse(s) - .ok() - .and_then(|u| u.host_str().map(|h| h.to_string())) - .unwrap_or_default(), - serde_json::Value::Object(o) => o - .get("id") - .and_then(|v| v.as_str()) - .and_then(|s| Url::parse(s).ok()) - .and_then(|u| u.host_str().map(|h| h.to_string())) - .unwrap_or_default(), - _ => String::new(), - }; - if !object_domain.is_empty() && actor_domain != object_domain { - return Err(Error::bad_request(anyhow::anyhow!( - "Delete actor domain does not match object domain" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - let actor_url = self.actor.inner().clone(); - - // Extract object URL — handles plain string and Tombstone {"id":"...","type":"Tombstone"} - let object_url_str = match &self.object { - serde_json::Value::String(s) => s.clone(), - serde_json::Value::Object(o) => o - .get("id") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()) - .unwrap_or_default(), - _ => String::new(), - }; - let Ok(object_url) = Url::parse(&object_url_str) else { - tracing::warn!(actor = %actor_url, "Delete activity has unparseable object, ignoring"); - return Ok(()); - }; - - // Actor self-deletion: Mastodon sends Delete(actor_url) when an account is deleted. - if object_url == *self.actor.inner() { - data.object_handler - .on_actor_removed(&actor_url) - .await - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - tracing::info!(actor = %actor_url, "received Delete(actor) — remote account deleted"); - return Ok(()); - } - - // Normal note deletion. - data.object_handler - .on_delete(&object_url, &actor_url) - .await - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - tracing::info!(object = %object_url, "received Delete(note)"); - Ok(()) - } -} - -// --- Update --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct UpdateActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: UpdateType, - pub(crate) actor: ObjectId, - pub(crate) object: serde_json::Value, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) to: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) cc: Vec, -} - -#[async_trait::async_trait] -impl Activity for UpdateActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) - && let Ok(attributed_url) = Url::parse(attributed_to) - && &attributed_url != self.actor.inner() - { - return Err(Error::bad_request(anyhow::anyhow!( - "Update actor does not match object attributedTo" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - let ap_id = self - .object - .get("id") - .and_then(|v| v.as_str()) - .and_then(|s| Url::parse(s).ok()) - .unwrap_or_else(|| self.id.clone()); - let actor_url = self.actor.inner().clone(); - data.object_handler - .on_update(&ap_id, &actor_url, self.object) - .await - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - tracing::info!(actor = %actor_url, "received update activity"); - Ok(()) - } -} - -// --- Announce --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct AnnounceActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: AnnounceType, - pub(crate) actor: ObjectId, - pub(crate) object: Url, - pub(crate) published: Option>, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) to: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) cc: Vec, -} - -#[async_trait::async_trait] -impl Activity for AnnounceActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - verify_domains_match(&self.id, self.actor.inner())?; - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - let object_domain = self.object.host_str().unwrap_or(""); - if object_domain != data.domain { - tracing::debug!( - actor = %self.actor.inner(), - object = %self.object, - "received Announce of non-local object — skipped (cross-server boost not supported)" - ); - return Ok(()); - } - data.federation_repo - .add_announce( - self.id.as_str(), - self.object.as_str(), - self.actor.inner().as_str(), - self.published.unwrap_or_else(chrono::Utc::now), - ) - .await?; - data.object_handler - .on_announce_received(&self.object, self.actor.inner()) - .await - .unwrap_or_else(|e| { - tracing::warn!(error = %e, "failed to process announce notification"); - }); - tracing::info!(actor = %self.actor.inner(), object = %self.object, "received announce"); - Ok(()) - } -} - -// --- Like --- - -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct LikeActivity { - pub id: Url, - #[serde(rename = "type")] - pub kind: LikeType, - pub actor: ObjectId, - pub object: Url, -} - -#[async_trait::async_trait] -impl Activity for LikeActivity { - type DataType = FederationData; - type Error = crate::error::Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - verify_domains_match(&self.id, self.actor.inner())?; - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring Like from blocked domain"); - return Ok(()); - } - - // Only process if the liked object is on our instance. - if self.object.host_str().unwrap_or("") != data.domain { - return Ok(()); - } - - data.object_handler - .on_like(&self.object, self.actor.inner()) - .await - .map_err(|e| crate::error::Error::from(anyhow::anyhow!(e)))?; - - tracing::info!(actor = %self.actor.inner(), object = %self.object, "received like"); - Ok(()) - } -} - -// --- Add --- - -#[derive(Clone, Default, Debug, Serialize, Deserialize)] -#[serde(rename = "Add")] -pub struct AddType; - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct AddActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: AddType, - pub(crate) actor: ObjectId, - pub(crate) object: serde_json::Value, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) to: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub(crate) cc: Vec, -} - -#[async_trait::async_trait] -impl Activity for AddActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) - && let Ok(attributed_url) = Url::parse(attributed_to) - && &attributed_url != self.actor.inner() - { - return Err(Error::bad_request(anyhow::anyhow!( - "Add actor does not match object attributedTo" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring Add from blocked domain"); - return Ok(()); - } - let ap_id = self.id.clone(); - let actor_url = self.actor.inner().clone(); - data.object_handler - .on_create(&ap_id, &actor_url, self.object) - .await - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - tracing::info!(actor = %actor_url, "received Add activity"); - Ok(()) - } -} - -// --- Block --- - -#[derive(Clone, Default, Debug, Serialize, Deserialize)] -#[serde(rename = "Block")] -pub struct BlockType; - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct BlockActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: BlockType, - pub(crate) actor: ObjectId, - pub(crate) object: Url, -} - -#[async_trait::async_trait] -impl Activity for BlockActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - verify_domains_match(&self.id, self.actor.inner())?; - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); - return Ok(()); - } - if let Some(local_user_id) = crate::urls::extract_user_id_from_url(&self.object) { - let _ = data - .federation_repo - .remove_following(local_user_id, self.actor.inner().as_str()) - .await; - let _ = data - .federation_repo - .remove_follower(local_user_id, self.actor.inner().as_str()) - .await; - } - tracing::info!(actor = %self.actor.inner(), "received block — removed following and follower"); - Ok(()) - } -} - -// --- Move (account migration) --- - -#[derive(Clone, Default, Debug, Serialize, Deserialize)] -#[serde(rename = "Move")] -pub struct MoveType; - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct MoveActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: MoveType, - pub(crate) actor: ObjectId, - pub(crate) object: Url, - pub(crate) target: Url, -} - -#[async_trait::async_trait] -impl Activity for MoveActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - if &self.object != self.actor.inner() { - return Err(Error::bad_request(anyhow::anyhow!( - "Move object must be the actor itself" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - return Ok(()); - } - tracing::info!( - actor = %self.actor.inner(), - target = %self.target, - "received Move (account migration) — target noted" - ); - Ok(()) - } -} - -// --- Inbox dispatch enum --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(tag = "type")] -#[enum_delegate::implement(Activity)] -pub enum InboxActivities { - #[serde(rename = "Follow")] - Follow(FollowActivity), - #[serde(rename = "Accept")] - Accept(AcceptActivity), - #[serde(rename = "Reject")] - Reject(RejectActivity), - #[serde(rename = "Undo")] - Undo(UndoActivity), - #[serde(rename = "Create")] - Create(CreateActivity), - #[serde(rename = "Delete")] - Delete(DeleteActivity), - #[serde(rename = "Update")] - Update(UpdateActivity), - #[serde(rename = "Announce")] - Announce(AnnounceActivity), - #[serde(rename = "Add")] - Add(AddActivity), - #[serde(rename = "Block")] - Block(BlockActivity), - #[serde(rename = "Like")] - Like(LikeActivity), - #[serde(rename = "Move")] - Move(MoveActivity), -} diff --git a/crates/adapters/activitypub-base/src/actor_handler.rs b/crates/adapters/activitypub-base/src/actor_handler.rs deleted file mode 100644 index 7030967..0000000 --- a/crates/adapters/activitypub-base/src/actor_handler.rs +++ /dev/null @@ -1,25 +0,0 @@ -use activitypub_federation::{ - axum::json::FederationJson, config::Data, protocol::context::WithContext, traits::Object, -}; -use axum::extract::Path; - -use crate::actors::{Person, get_local_actor}; -use crate::data::FederationData; -use crate::error::Error; - -pub async fn actor_handler( - Path(username): Path, - data: Data, -) -> Result>, Error> { - let ap_user = data - .user_repo - .find_by_username(&username) - .await - .map_err(Error::from)? - .ok_or_else(|| Error::bad_request(anyhow::anyhow!("user not found")))?; - - let db_actor = get_local_actor(ap_user.id, &data).await?; - let person = db_actor.into_json(&data).await?; - - Ok(FederationJson(WithContext::new_default(person))) -} diff --git a/crates/adapters/activitypub-base/src/actors.rs b/crates/adapters/activitypub-base/src/actors.rs deleted file mode 100644 index e321485..0000000 --- a/crates/adapters/activitypub-base/src/actors.rs +++ /dev/null @@ -1,372 +0,0 @@ -use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - http_signatures::generate_actor_keypair, - kinds::actor::PersonType, - protocol::{public_key::PublicKey, verification::verify_domains_match}, - traits::{Actor, Object}, -}; -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use url::Url; - -use crate::data::FederationData; -use crate::error::Error; -use crate::repository::RemoteActor; -use crate::user::ApProfileField; - -#[derive(Debug, Clone)] -pub struct DbActor { - pub user_id: uuid::Uuid, - pub username: String, - pub public_key_pem: String, - pub private_key_pem: Option, - pub inbox_url: Url, - pub shared_inbox_url: Option, - pub outbox_url: Url, - pub followers_url: Url, - pub following_url: Url, - pub ap_id: Url, - pub last_refreshed_at: DateTime, - pub bio: Option, - pub avatar_url: Option, - pub banner_url: Option, - pub also_known_as: Option, - pub profile_url: Option, - pub attachment: Vec, -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct ApImageObject { - #[serde(rename = "type")] - pub kind: String, - pub url: Url, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Endpoints { - pub shared_inbox: Url, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ProfileFieldObject { - #[serde(rename = "type")] - pub kind: String, - pub name: String, - pub value: String, -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct Person { - #[serde(rename = "type")] - kind: PersonType, - id: ObjectId, - preferred_username: String, - inbox: Url, - outbox: Url, - followers: Url, - following: Url, - public_key: PublicKey, - name: Option, - #[serde(skip_serializing_if = "Option::is_none")] - summary: Option, - #[serde(skip_serializing_if = "Option::is_none")] - icon: Option, - #[serde(skip_serializing_if = "Option::is_none")] - url: Option, - #[serde(skip_serializing_if = "Option::is_none")] - discoverable: Option, - manually_approves_followers: bool, - #[serde(skip_serializing_if = "Option::is_none", default)] - updated: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - endpoints: Option, - #[serde(skip_serializing_if = "Option::is_none")] - image: Option, - #[serde(rename = "alsoKnownAs", skip_serializing_if = "Vec::is_empty", default)] - also_known_as: Vec, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - attachment: Vec, -} - -struct ActorUrls { - ap_id: Url, - inbox_url: Url, - shared_inbox_url: Option, - outbox_url: Url, - followers_url: Url, - following_url: Url, -} - -impl ActorUrls { - fn build(base_url: &str, user_id: uuid::Uuid) -> Self { - let ap_id = crate::urls::actor_url(base_url, user_id); - Self { - inbox_url: Url::parse(&format!("{}/inbox", &ap_id)).expect("valid url"), - shared_inbox_url: Url::parse(&format!("{}/inbox", base_url)).ok(), - outbox_url: Url::parse(&format!("{}/outbox", &ap_id)).expect("valid url"), - followers_url: Url::parse(&format!("{}/followers", &ap_id)).expect("valid url"), - following_url: Url::parse(&format!("{}/following", &ap_id)).expect("valid url"), - ap_id, - } - } -} - -pub async fn get_local_actor( - user_id: uuid::Uuid, - data: &Data, -) -> Result { - let user = data - .user_repo - .find_by_id(user_id) - .await - .map_err(Error::from)? - .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found: {}", user_id)))?; - - let (public_key, private_key) = match data - .federation_repo - .get_local_actor_keypair(user_id) - .await? - { - Some(kp) => kp, - None => { - let kp = generate_actor_keypair()?; - data.federation_repo - .save_local_actor_keypair(user_id, kp.public_key.clone(), kp.private_key.clone()) - .await?; - (kp.public_key, kp.private_key) - } - }; - - let ActorUrls { - ap_id, - inbox_url, - shared_inbox_url, - outbox_url, - followers_url, - following_url, - } = ActorUrls::build(&data.base_url, user_id); - - Ok(DbActor { - user_id, - username: user.username, - public_key_pem: public_key, - private_key_pem: Some(private_key), - inbox_url, - shared_inbox_url, - outbox_url, - followers_url, - following_url, - ap_id, - last_refreshed_at: Utc::now(), - bio: user.bio, - avatar_url: user.avatar_url, - banner_url: user.banner_url, - also_known_as: user.also_known_as, - profile_url: user.profile_url, - attachment: user.attachment, - }) -} - -#[async_trait::async_trait] -impl Object for DbActor { - type DataType = FederationData; - type Kind = Person; - type Error = Error; - - fn id(&self) -> &Url { - &self.ap_id - } - - fn last_refreshed_at(&self) -> Option> { - Some(self.last_refreshed_at) - } - - async fn read_from_id( - object_id: Url, - data: &Data, - ) -> Result, Self::Error> { - let user_id = match crate::urls::extract_user_id_from_url(&object_id) { - Some(id) => id, - None => return Ok(None), - }; - let user = match data.user_repo.find_by_id(user_id).await { - Ok(Some(u)) => u, - _ => return Ok(None), - }; - - let keypair = data - .federation_repo - .get_local_actor_keypair(user_id) - .await?; - - let (public_key, private_key) = match keypair { - Some(kp) => (kp.0, Some(kp.1)), - None => return Ok(None), - }; - - let ActorUrls { - ap_id, - inbox_url, - shared_inbox_url, - outbox_url, - followers_url, - following_url, - } = ActorUrls::build(&data.base_url, user_id); - - Ok(Some(DbActor { - user_id, - username: user.username, - public_key_pem: public_key, - private_key_pem: private_key, - inbox_url, - shared_inbox_url, - outbox_url, - followers_url, - following_url, - ap_id, - last_refreshed_at: Utc::now(), - bio: None, - avatar_url: None, - banner_url: None, - also_known_as: None, - profile_url: None, - attachment: vec![], - })) - } - - async fn into_json(self, data: &Data) -> Result { - let public_key = PublicKey { - id: format!("{}#main-key", &self.ap_id), - owner: self.ap_id.clone(), - public_key_pem: self.public_key_pem.clone(), - }; - - let icon = self.avatar_url.map(|url| ApImageObject { - kind: "Image".to_string(), - url, - }); - let image = self.banner_url.map(|url| ApImageObject { - kind: "Image".to_string(), - url, - }); - let profile_url = self.profile_url; - let also_known_as: Vec = self.also_known_as.into_iter().collect(); - let attachment: Vec = self - .attachment - .into_iter() - .map(|f| ProfileFieldObject { - kind: "PropertyValue".to_string(), - name: f.name, - value: f.value, - }) - .collect(); - - let shared_inbox = - Url::parse(&format!("{}/inbox", data.base_url)).expect("base_url is always valid"); - - Ok(Person { - kind: Default::default(), - id: self.ap_id.clone().into(), - preferred_username: self.username.clone(), - inbox: self.inbox_url.clone(), - outbox: self.outbox_url.clone(), - followers: self.followers_url.clone(), - following: self.following_url.clone(), - public_key, - name: Some(self.username.clone()), - summary: self.bio.clone(), - icon, - url: profile_url, - discoverable: Some(true), - manually_approves_followers: true, - updated: Some(self.last_refreshed_at), - endpoints: Some(Endpoints { shared_inbox }), - image, - also_known_as, - attachment, - }) - } - - async fn verify( - json: &Self::Kind, - expected_domain: &Url, - _data: &Data, - ) -> Result<(), Self::Error> { - verify_domains_match(json.id.inner(), expected_domain)?; - Ok(()) - } - - async fn from_json(json: Self::Kind, data: &Data) -> Result { - let shared_inbox_url = json.endpoints.as_ref().map(|e| e.shared_inbox.to_string()); - let actor = RemoteActor { - url: json.id.inner().to_string(), - handle: json.preferred_username.clone(), - inbox_url: json.inbox.to_string(), - shared_inbox_url, - display_name: json.name.clone(), - avatar_url: json.icon.as_ref().map(|i| i.url.to_string()), - outbox_url: Some(json.outbox.to_string()), - }; - data.federation_repo.upsert_remote_actor(actor).await?; - - let url_str = json.id.inner().to_string(); - let user_id = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, url_str.as_bytes()); - let ap_id = json.id.inner().clone(); - let inbox_url = json.inbox.clone(); - let shared_inbox_url = json - .endpoints - .as_ref() - .and_then(|e| Url::parse(e.shared_inbox.as_str()).ok()); - let outbox_url = json.outbox.clone(); - let followers_url = json.followers.clone(); - let following_url = json.following.clone(); - - Ok(DbActor { - user_id, - username: json.preferred_username.clone(), - public_key_pem: json.public_key.public_key_pem, - private_key_pem: None, - inbox_url, - shared_inbox_url, - outbox_url, - followers_url, - following_url, - ap_id, - last_refreshed_at: Utc::now(), - bio: json.summary.clone(), - avatar_url: json.icon.as_ref().map(|i| i.url.clone()), - banner_url: json.image.as_ref().map(|i| i.url.clone()), - also_known_as: json.also_known_as.into_iter().next(), - profile_url: json.url.clone(), - attachment: json - .attachment - .iter() - .map(|f| crate::user::ApProfileField { - name: f.name.clone(), - value: f.value.clone(), - }) - .collect(), - }) - } -} - -impl Actor for DbActor { - fn public_key_pem(&self) -> &str { - &self.public_key_pem - } - - fn private_key_pem(&self) -> Option { - self.private_key_pem.clone() - } - - fn inbox(&self) -> Url { - self.inbox_url.clone() - } -} - -#[cfg(test)] -#[path = "tests/actors.rs"] -mod tests; diff --git a/crates/adapters/activitypub-base/src/content.rs b/crates/adapters/activitypub-base/src/content.rs deleted file mode 100644 index e6156d9..0000000 --- a/crates/adapters/activitypub-base/src/content.rs +++ /dev/null @@ -1,68 +0,0 @@ -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use url::Url; - -#[async_trait] -pub trait ApObjectHandler: Send + Sync { - /// Returns (ap_id, serialized object) for all local content owned by this user. - /// Used by outbox (count) and backfill (delivery). Must only return locally-authored content. - async fn get_local_objects_for_user( - &self, - user_id: uuid::Uuid, - ) -> anyhow::Result>; - - /// Returns up to `limit` objects ordered newest-first, published before `before`. - /// Returns (ap_id, object_json, published_at). - async fn get_local_objects_page( - &self, - user_id: uuid::Uuid, - before: Option>, - limit: usize, - ) -> anyhow::Result)>>; - - /// Incoming Create activity — persist remote content. - async fn on_create( - &self, - ap_id: &Url, - actor_url: &Url, - object: serde_json::Value, - ) -> anyhow::Result<()>; - - /// Incoming Update activity — update existing remote content. - async fn on_update( - &self, - ap_id: &Url, - actor_url: &Url, - object: serde_json::Value, - ) -> anyhow::Result<()>; - - /// Incoming Delete activity — remove specific remote content. - async fn on_delete(&self, ap_id: &Url, actor_url: &Url) -> anyhow::Result<()>; - - /// Actor unfollowed/was removed — clean up all their remote content. - async fn on_actor_removed(&self, actor_url: &Url) -> anyhow::Result<()>; - - /// Called when a remote actor likes a local thought. - /// `object_url` is the AP URL of the liked note (e.g. `{base}/thoughts/{uuid}`). - /// `actor_url` is the AP URL of the remote actor who sent the Like. - async fn on_like(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; - - /// Called when a remote actor boosts (Announce) a local thought. - /// `object_url` is the AP URL of the announced note. - /// `actor_url` is the AP URL of the remote actor who sent the Announce. - async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; - - /// Called when a remote actor removes a Like from a local thought. - async fn on_unlike(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; - - /// Called when an inbound Note tags a local user with a Mention. - async fn on_mention( - &self, - thought_ap_id: &Url, - mentioned_user_uuid: uuid::Uuid, - actor_url: &Url, - ) -> anyhow::Result<()>; - - /// Total number of locally-authored posts across all users. - async fn count_local_posts(&self) -> anyhow::Result; -} diff --git a/crates/adapters/activitypub-base/src/data.rs b/crates/adapters/activitypub-base/src/data.rs deleted file mode 100644 index 2f3497c..0000000 --- a/crates/adapters/activitypub-base/src/data.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::sync::Arc; - -use crate::content::ApObjectHandler; -use crate::repository::FederationRepository; -use crate::user::ApUserRepository; -use domain::ports::EventPublisher; - -#[derive(Clone)] -pub struct FederationData { - pub(crate) federation_repo: Arc, - pub(crate) user_repo: Arc, - pub(crate) object_handler: Arc, - pub(crate) base_url: String, - pub(crate) domain: String, - pub(crate) allow_registration: bool, - pub(crate) software_name: String, - #[allow(dead_code)] - pub(crate) event_publisher: Option>, -} - -impl FederationData { - pub fn new( - federation_repo: Arc, - user_repo: Arc, - object_handler: Arc, - base_url: String, - allow_registration: bool, - software_name: String, - event_publisher: Option>, - ) -> Self { - let domain = base_url - .trim_start_matches("https://") - .trim_start_matches("http://") - .split('/') - .next() - .unwrap_or("") - .to_string(); - Self { - federation_repo, - user_repo, - object_handler, - base_url, - domain, - allow_registration, - software_name, - event_publisher, - } - } -} diff --git a/crates/adapters/activitypub-base/src/error.rs b/crates/adapters/activitypub-base/src/error.rs deleted file mode 100644 index d631755..0000000 --- a/crates/adapters/activitypub-base/src/error.rs +++ /dev/null @@ -1,48 +0,0 @@ -use std::fmt::{Display, Formatter}; - -use axum::http::StatusCode; - -#[derive(Debug)] -pub struct Error(pub(crate) anyhow::Error, pub(crate) StatusCode); - -impl Error { - pub fn not_found(e: impl Into) -> Self { - Self(e.into(), StatusCode::NOT_FOUND) - } - - pub fn bad_request(e: impl Into) -> Self { - Self(e.into(), StatusCode::BAD_REQUEST) - } -} - -impl Display for Error { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self.0, f) - } -} - -impl From for Error -where - T: Into, -{ - fn from(t: T) -> Self { - Error(t.into(), StatusCode::INTERNAL_SERVER_ERROR) - } -} - -impl axum::response::IntoResponse for Error { - fn into_response(self) -> axum::response::Response { - let status = self.1; - if status.is_server_error() { - tracing::error!(error = %self.0, status = status.as_u16(), "federation error"); - } else { - tracing::debug!(error = %self.0, status = status.as_u16(), "federation response"); - } - let body = if status.is_server_error() { - "internal server error".to_string() - } else { - self.0.to_string() - }; - (status, body).into_response() - } -} diff --git a/crates/adapters/activitypub-base/src/federation.rs b/crates/adapters/activitypub-base/src/federation.rs deleted file mode 100644 index 23d59e2..0000000 --- a/crates/adapters/activitypub-base/src/federation.rs +++ /dev/null @@ -1,49 +0,0 @@ -use activitypub_federation::config::{Data, FederationConfig, FederationMiddleware, UrlVerifier}; -use activitypub_federation::error::Error as FedError; -use url::Url; - -use crate::data::FederationData; - -#[derive(Clone)] -struct PermissiveVerifier; - -#[async_trait::async_trait] -impl UrlVerifier for PermissiveVerifier { - async fn verify(&self, _url: &Url) -> Result<(), FedError> { - Ok(()) - } -} - -#[derive(Clone)] -pub struct ApFederationConfig(pub FederationConfig); - -impl ApFederationConfig { - pub async fn new(data: FederationData, debug: bool) -> anyhow::Result { - let config = if debug { - FederationConfig::builder() - .domain(&data.domain) - .app_data(data) - .debug(true) - .http_signature_compat(true) - .url_verifier(Box::new(PermissiveVerifier)) - .build() - .await? - } else { - FederationConfig::builder() - .domain(&data.domain) - .app_data(data) - .debug(false) - .build() - .await? - }; - Ok(Self(config)) - } - - pub fn to_request_data(&self) -> Data { - self.0.to_request_data() - } - - pub fn middleware(&self) -> FederationMiddleware { - FederationMiddleware::new(self.0.clone()) - } -} diff --git a/crates/adapters/activitypub-base/src/followers_handler.rs b/crates/adapters/activitypub-base/src/followers_handler.rs deleted file mode 100644 index e5de463..0000000 --- a/crates/adapters/activitypub-base/src/followers_handler.rs +++ /dev/null @@ -1,105 +0,0 @@ -use activitypub_federation::{axum::json::FederationJson, config::Data}; -use axum::extract::{Path, Query}; -use serde::Deserialize; -use serde_json::json; - -use crate::data::FederationData; -use crate::error::Error; -use crate::urls::AP_PAGE_SIZE; - -#[derive(Deserialize)] -pub struct PageQuery { - page: Option, -} - -async fn collection_handler( - user_id_str: &str, - query: PageQuery, - data: Data, - collection_type: &str, -) -> Result, Error> { - let user_id = uuid::Uuid::parse_str(user_id_str) - .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?; - - data.user_repo - .find_by_id(user_id) - .await - .map_err(Error::from)? - .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; - - let collection_id = format!( - "{}/users/{}/{}", - data.base_url, user_id_str, collection_type - ); - - let total = match collection_type { - "followers" => data.federation_repo.count_followers(user_id).await, - _ => data.federation_repo.count_following(user_id).await, - } - .map_err(Error::from)?; - - if let Some(page) = query.page { - let page = page.max(1); - let offset = (page.saturating_sub(1) as usize) * AP_PAGE_SIZE; - - let items: Vec = match collection_type { - "followers" => data - .federation_repo - .get_followers_page(user_id, offset as u32, AP_PAGE_SIZE) - .await - .map_err(Error::from)? - .into_iter() - .map(|f| f.actor.url) - .collect(), - _ => data - .federation_repo - .get_following_page(user_id, offset as u32, AP_PAGE_SIZE) - .await - .map_err(Error::from)? - .into_iter() - .map(|a| a.url) - .collect(), - }; - - let has_next = offset + items.len() < total; - - let mut obj = json!({ - "@context": crate::urls::AP_CONTEXT, - "type": "OrderedCollectionPage", - "id": format!("{}?page={}", collection_id, page), - "partOf": collection_id, - "totalItems": total, - "orderedItems": items, - }); - - if has_next { - obj["next"] = json!(format!("{}?page={}", collection_id, page + 1)); - } - - Ok(FederationJson(obj)) - } else { - Ok(FederationJson(json!({ - "@context": crate::urls::AP_CONTEXT, - "type": "OrderedCollection", - "id": collection_id, - "totalItems": total, - "first": format!("{}?page=1", collection_id), - }))) - } -} - -pub async fn followers_handler( - Path(user_id_str): Path, - Query(query): Query, - data: Data, -) -> Result, Error> { - collection_handler(&user_id_str, query, data, "followers").await -} - -pub async fn following_handler( - Path(user_id_str): Path, - Query(query): Query, - data: Data, -) -> Result, Error> { - collection_handler(&user_id_str, query, data, "following").await -} diff --git a/crates/adapters/activitypub-base/src/inbox.rs b/crates/adapters/activitypub-base/src/inbox.rs deleted file mode 100644 index 2f2d063..0000000 --- a/crates/adapters/activitypub-base/src/inbox.rs +++ /dev/null @@ -1,18 +0,0 @@ -use activitypub_federation::{ - axum::inbox::{ActivityData, receive_activity}, - config::Data, - protocol::context::WithContext, -}; - -use crate::activities::InboxActivities; -use crate::actors::DbActor; -use crate::data::FederationData; -use crate::error::Error; - -pub async fn inbox_handler( - data: Data, - activity_data: ActivityData, -) -> Result<(), Error> { - receive_activity::, DbActor, FederationData>(activity_data, &data) - .await -} diff --git a/crates/adapters/activitypub-base/src/lib.rs b/crates/adapters/activitypub-base/src/lib.rs deleted file mode 100644 index b4d53dd..0000000 --- a/crates/adapters/activitypub-base/src/lib.rs +++ /dev/null @@ -1,32 +0,0 @@ -pub mod activities; -pub mod actor_handler; -pub mod actors; -pub mod ap_ports; -pub mod content; -pub mod data; -pub mod error; -pub mod federation; -pub mod followers_handler; -pub mod inbox; -pub mod nodeinfo; -pub mod outbox; -pub mod repository; -pub mod service; -pub(crate) mod urls; -pub use urls::AS_PUBLIC; -pub mod user; -pub mod webfinger; - -pub use activitypub_federation::kinds::object::NoteType; -pub use ap_ports::{ - AcceptNoteInput, ActivityPubRepository, ActorApUrls, OutboundFederationPort, OutboxEntry, -}; -pub use content::ApObjectHandler; -pub use data::FederationData; -pub use error::Error; -pub use federation::ApFederationConfig; -pub use repository::{ - BlockedDomain, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, -}; -pub use service::ActivityPubService; -pub use user::{ApProfileField, ApUser, ApUserRepository}; diff --git a/crates/adapters/activitypub-base/src/nodeinfo.rs b/crates/adapters/activitypub-base/src/nodeinfo.rs deleted file mode 100644 index f619d75..0000000 --- a/crates/adapters/activitypub-base/src/nodeinfo.rs +++ /dev/null @@ -1,82 +0,0 @@ -use activitypub_federation::config::Data; -use axum::Json; -use serde::Serialize; - -use crate::data::FederationData; -use crate::error::Error; - -const NODEINFO_2_0_REL: &str = "http://nodeinfo.diaspora.software/ns/schema/2.0"; - -#[derive(Serialize)] -pub struct NodeInfoWellKnown { - pub links: Vec, -} - -#[derive(Serialize)] -pub struct NodeInfoLink { - pub rel: String, - pub href: String, -} - -#[derive(Serialize)] -pub struct NodeInfoSoftware { - pub name: String, - pub version: String, -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct NodeInfoUsage { - pub users: NodeInfoUsers, - pub local_posts: u64, -} - -#[derive(Serialize)] -pub struct NodeInfoUsers { - pub total: usize, -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct NodeInfo { - pub version: String, - pub software: NodeInfoSoftware, - pub protocols: Vec, - pub usage: NodeInfoUsage, - pub open_registrations: bool, -} - -pub async fn nodeinfo_well_known_handler( - data: Data, -) -> Result, Error> { - let href = format!("{}/nodeinfo/2.0", data.base_url); - Ok(Json(NodeInfoWellKnown { - links: vec![NodeInfoLink { - rel: NODEINFO_2_0_REL.to_string(), - href, - }], - })) -} - -pub async fn nodeinfo_handler(data: Data) -> Result, Error> { - let user_count = data.user_repo.count_users().await.unwrap_or(0); - let local_posts = data.object_handler.count_local_posts().await.unwrap_or(0); - - Ok(Json(NodeInfo { - version: "2.0".to_string(), - software: NodeInfoSoftware { - name: data.software_name.clone(), - version: env!("CARGO_PKG_VERSION").to_string(), - }, - protocols: vec!["activitypub".to_string()], - usage: NodeInfoUsage { - users: NodeInfoUsers { total: user_count }, - local_posts, - }, - open_registrations: data.allow_registration, - })) -} - -#[cfg(test)] -#[path = "tests/nodeinfo.rs"] -mod tests; diff --git a/crates/adapters/activitypub-base/src/outbox.rs b/crates/adapters/activitypub-base/src/outbox.rs deleted file mode 100644 index 80f004e..0000000 --- a/crates/adapters/activitypub-base/src/outbox.rs +++ /dev/null @@ -1,138 +0,0 @@ -use axum::extract::{Path, Query}; -use axum::response::IntoResponse; -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use url::Url; - -use activitypub_federation::{ - config::Data, fetch::object_id::ObjectId, kinds::activity::CreateType, - protocol::context::WithContext, -}; - -use crate::{activities::CreateActivity, data::FederationData, error::Error, urls::AP_PAGE_SIZE}; - -#[derive(Deserialize)] -pub struct OutboxQuery { - page: Option, - before: Option, -} - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct OrderedCollection { - #[serde(rename = "@context")] - context: String, - #[serde(rename = "type")] - kind: String, - id: String, - total_items: u64, - first: String, -} - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct OrderedCollectionPage { - #[serde(rename = "@context")] - context: String, - #[serde(rename = "type")] - kind: String, - id: String, - part_of: String, - ordered_items: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - next: Option, -} - -pub async fn outbox_handler( - Path(user_id_str): Path, - Query(query): Query, - data: Data, -) -> Result { - let uuid = uuid::Uuid::parse_str(&user_id_str) - .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?; - - data.user_repo - .find_by_id(uuid) - .await - .map_err(Error::from)? - .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; - - let outbox_url = format!("{}/users/{}/outbox", data.base_url, user_id_str); - - if query.page.unwrap_or(false) { - let before: Option> = query.before.as_deref().and_then(|s| s.parse().ok()); - - let items = data - .object_handler - .get_local_objects_page(uuid, before, AP_PAGE_SIZE) - .await - .map_err(|e| Error::from(anyhow::anyhow!("{}", e)))?; - - let actor_url: Url = format!("{}/users/{}", data.base_url, user_id_str) - .parse() - .expect("valid url"); - - let has_more = items.len() == AP_PAGE_SIZE; - let oldest_ts = items.last().map(|(_, _, ts)| *ts); - - let followers_url = format!("{}/followers", actor_url); - let ordered_items: Vec = items - .into_iter() - .map(|(ap_id, object, _)| { - let create_id = Url::parse(&format!("{}/activity", ap_id)).expect("valid url"); - serde_json::to_value(WithContext::new_default(CreateActivity { - id: create_id, - kind: CreateType::default(), - actor: ObjectId::from(actor_url.clone()), - object, - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![followers_url.clone()], - bto: vec![], - bcc: vec![], - })) - .expect("serializable") - }) - .collect(); - - let page_id = match &query.before { - Some(b) => format!("{}?page=true&before={}", outbox_url, b), - None => format!("{}?page=true", outbox_url), - }; - - let next = if has_more { - oldest_ts.map(|ts| { - // Use RFC 3339 with Z suffix (no + sign) to avoid percent-encoding - let ts_str = ts.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); - format!("{}?page=true&before={}", outbox_url, ts_str) - }) - } else { - None - }; - - Ok(axum::Json(OrderedCollectionPage { - context: crate::urls::AP_CONTEXT.to_string(), - kind: "OrderedCollectionPage".to_string(), - id: page_id, - part_of: outbox_url, - ordered_items, - next, - }) - .into_response()) - } else { - let total = data - .object_handler - .get_local_objects_for_user(uuid) - .await - .map_err(|e| Error::from(anyhow::anyhow!("{}", e)))? - .len() as u64; - - Ok(axum::Json(OrderedCollection { - context: crate::urls::AP_CONTEXT.to_string(), - kind: "OrderedCollection".to_string(), - id: outbox_url.clone(), - total_items: total, - first: format!("{}?page=true", outbox_url), - }) - .into_response()) - } -} diff --git a/crates/adapters/activitypub-base/src/repository.rs b/crates/adapters/activitypub-base/src/repository.rs deleted file mode 100644 index 0aab3c2..0000000 --- a/crates/adapters/activitypub-base/src/repository.rs +++ /dev/null @@ -1,134 +0,0 @@ -use anyhow::Result; -use async_trait::async_trait; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum FollowerStatus { - Pending, - Accepted, - Rejected, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum FollowingStatus { - Pending, - Accepted, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct RemoteActor { - pub url: String, - pub handle: String, - pub inbox_url: String, - pub shared_inbox_url: Option, - pub display_name: Option, - pub avatar_url: Option, - pub outbox_url: Option, -} - -#[derive(Debug, Clone)] -pub struct Follower { - pub actor: RemoteActor, - pub status: FollowerStatus, -} - -#[derive(Debug, Clone)] -pub struct BlockedDomain { - pub domain: String, - pub reason: Option, - pub blocked_at: String, -} - -#[async_trait] -pub trait FederationRepository: Send + Sync { - async fn add_follower( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - status: FollowerStatus, - follow_activity_id: &str, - ) -> Result<()>; - async fn get_follower_follow_activity_id( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> Result>; - async fn remove_follower( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> Result<()>; - async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result>; - async fn get_followers_page( - &self, - local_user_id: uuid::Uuid, - offset: u32, - limit: usize, - ) -> Result>; - async fn count_followers(&self, local_user_id: uuid::Uuid) -> Result; - async fn get_following_page( - &self, - local_user_id: uuid::Uuid, - offset: u32, - limit: usize, - ) -> Result>; - async fn update_follower_status( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - status: FollowerStatus, - ) -> Result<()>; - async fn add_following( - &self, - local_user_id: uuid::Uuid, - actor: RemoteActor, - follow_activity_id: &str, - ) -> Result<()>; - async fn get_follow_activity_id( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> Result>; - async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; - async fn get_following(&self, local_user_id: uuid::Uuid) -> Result>; - async fn count_following(&self, local_user_id: uuid::Uuid) -> Result; - async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()>; - async fn get_remote_actor(&self, actor_url: &str) -> Result>; - async fn get_local_actor_keypair( - &self, - user_id: uuid::Uuid, - ) -> Result>; - async fn save_local_actor_keypair( - &self, - user_id: uuid::Uuid, - public_key: String, - private_key: String, - ) -> Result<()>; - async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result>; - async fn update_following_status( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - status: FollowingStatus, - ) -> Result<()>; - async fn get_following_outbox_url( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> Result>; - async fn add_announce( - &self, - activity_id: &str, - object_url: &str, - actor_url: &str, - announced_at: chrono::DateTime, - ) -> Result<()>; - async fn count_announces(&self, object_url: &str) -> Result; - async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> Result<()>; - async fn remove_blocked_domain(&self, domain: &str) -> Result<()>; - async fn get_blocked_domains(&self) -> Result>; - async fn is_domain_blocked(&self, domain: &str) -> Result; - async fn add_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; - async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; - async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result>; - async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result; -} diff --git a/crates/adapters/activitypub-base/src/service.rs b/crates/adapters/activitypub-base/src/service.rs deleted file mode 100644 index 26a83cb..0000000 --- a/crates/adapters/activitypub-base/src/service.rs +++ /dev/null @@ -1,2225 +0,0 @@ -use std::sync::Arc; - -use domain::ports::FederationFetchPort; - -use activitypub_federation::{ - activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext, - traits::Actor, -}; -use axum::{Router, routing::get, routing::post}; -use url::Url; - -use crate::{ - activities::{ - AcceptActivity, CreateActivity, FollowActivity, RejectActivity, UndoActivity, - UpdateActivity, - }, - actors::{DbActor, get_local_actor}, - content::ApObjectHandler, - data::FederationData, - federation::ApFederationConfig, - inbox::inbox_handler, - nodeinfo::{nodeinfo_handler, nodeinfo_well_known_handler}, - outbox::outbox_handler, - repository::{ - BlockedDomain, FederationRepository, FollowerStatus, FollowingStatus, RemoteActor, - }, - urls::activity_url, - user::ApUserRepository, - webfinger::webfinger_handler, -}; - -const DELIVERY_MAX_ATTEMPTS: u32 = 3; -const DELIVERY_INITIAL_DELAY_SECS: u64 = 1; -const HTTP_FETCH_TIMEOUT_SECS: u64 = 30; -const BATCH_FETCH_SLEEP_MS: u64 = 100; - -fn content_to_html(text: &str) -> String { - let escaped = text - .replace('&', "&") - .replace('<', "<") - .replace('>', ">") - .replace('"', """); - let paragraphs: Vec<&str> = escaped.split('\n').filter(|s| !s.is_empty()).collect(); - if paragraphs.is_empty() { - format!("

{}

", escaped) - } else { - paragraphs - .iter() - .map(|p| format!("

{}

", p)) - .collect::>() - .join("") - } -} - -fn thought_note_json( - thought: &domain::models::thought::Thought, - local_actor: &crate::actors::DbActor, - base_url: &str, - in_reply_to_url: Option<&str>, -) -> anyhow::Result<(url::Url, serde_json::Value)> { - let ap_id = url::Url::parse(&format!("{}/thoughts/{}", base_url, thought.id))?; - - // Build to/cc based on visibility per AP spec. - let (to, cc) = match thought.visibility { - domain::models::thought::Visibility::Public => ( - vec![crate::urls::AS_PUBLIC.to_string()], - vec![local_actor.followers_url.to_string()], - ), - domain::models::thought::Visibility::Unlisted => ( - vec![local_actor.followers_url.to_string()], - vec![crate::urls::AS_PUBLIC.to_string()], - ), - domain::models::thought::Visibility::Followers => { - (vec![local_actor.followers_url.to_string()], vec![]) - } - domain::models::thought::Visibility::Direct => (vec![], vec![]), - }; - - let mut note = serde_json::json!({ - "type": "Note", - "id": ap_id.to_string(), - "url": ap_id.to_string(), - "attributedTo": local_actor.ap_id.to_string(), - "content": content_to_html(thought.content.as_str()), - "published": thought.created_at.to_rfc3339(), - "to": to, - "cc": cc, - "sensitive": thought.sensitive, - }); - if let Some(ref cw) = thought.content_warning { - note["summary"] = serde_json::json!(cw); - } - if let Some(reply_url) = in_reply_to_url { - note["inReplyTo"] = serde_json::json!(reply_url); - } - if let Some(updated_at) = thought.updated_at { - note["updated"] = serde_json::json!(updated_at.to_rfc3339()); - } - let hashtags = domain::hashtag::extract(thought.content.as_str()); - if !hashtags.is_empty() { - let ap_tags: Vec = hashtags - .iter() - .map(|h| { - serde_json::json!({ - "type": "Hashtag", - "name": h.ap_name, - "href": format!("{}/{}", base_url, h.url_slug), - }) - }) - .collect(); - note["tag"] = serde_json::json!(ap_tags); - } - Ok((ap_id, note)) -} - -fn collect_inboxes(followers: &[crate::repository::Follower]) -> Vec { - let mut seen = std::collections::HashSet::new(); - let mut inboxes = Vec::new(); - for f in followers { - let inbox_str = f - .actor - .shared_inbox_url - .as_deref() - .unwrap_or(&f.actor.inbox_url); - if seen.insert(inbox_str.to_string()) - && let Ok(url) = Url::parse(inbox_str) - { - inboxes.push(url); - } - } - inboxes -} - -pub(crate) async fn send_with_retry( - sends: Vec, - data: &activitypub_federation::config::Data, -) -> Vec { - let mut failures = vec![]; - for send in sends { - let mut delay = std::time::Duration::from_secs(DELIVERY_INITIAL_DELAY_SECS); - for attempt in 1..=DELIVERY_MAX_ATTEMPTS { - match send.clone().sign_and_send(data).await { - Ok(()) => break, - Err(e) if attempt < DELIVERY_MAX_ATTEMPTS => { - tracing::warn!(attempt, error = %e, "delivery failed, retrying"); - tokio::time::sleep(delay).await; - delay *= 2; - } - Err(e) => { - tracing::error!(attempt, error = %e, "delivery failed permanently"); - failures.push(anyhow::anyhow!(e)); - } - } - } - } - failures -} - -#[derive(Clone)] -pub struct ActivityPubService { - federation_config: ApFederationConfig, - base_url: String, - connections_repo: Arc, -} - -pub struct ActivityPubServiceBuilder { - repo: Arc, - user_repo: Arc, - object_handler: Arc, - base_url: String, - connections_repo: Arc, - allow_registration: bool, - software_name: String, - debug: bool, - event_publisher: Option>, -} - -impl ActivityPubServiceBuilder { - pub fn allow_registration(mut self, v: bool) -> Self { - self.allow_registration = v; - self - } - pub fn software_name(mut self, v: impl Into) -> Self { - self.software_name = v.into(); - self - } - pub fn debug(mut self, v: bool) -> Self { - self.debug = v; - self - } - pub fn event_publisher(mut self, v: Arc) -> Self { - self.event_publisher = Some(v); - self - } - pub async fn build(self) -> anyhow::Result { - let data = FederationData::new( - self.repo, - self.user_repo, - self.object_handler, - self.base_url.clone(), - self.allow_registration, - self.software_name, - self.event_publisher, - ); - let federation_config = ApFederationConfig::new(data, self.debug).await?; - Ok(ActivityPubService { - federation_config, - base_url: self.base_url, - connections_repo: self.connections_repo, - }) - } -} - -impl ActivityPubService { - pub fn builder( - repo: Arc, - user_repo: Arc, - object_handler: Arc, - base_url: impl Into, - connections_repo: Arc, - ) -> ActivityPubServiceBuilder { - ActivityPubServiceBuilder { - repo, - user_repo, - object_handler, - base_url: base_url.into(), - connections_repo, - allow_registration: false, - software_name: String::new(), - debug: false, - event_publisher: None, - } - } - - pub fn federation_config(&self) -> &ApFederationConfig { - &self.federation_config - } - - pub fn request_data(&self) -> activitypub_federation::config::Data { - self.federation_config.to_request_data() - } - - /// Returns `(local_actor, deduplicated_inboxes)` for all accepted followers, - /// excluding blocked actors and blocked domains. - /// Returns `None` if there are no eligible followers. - async fn accepted_follower_inboxes( - &self, - data: &activitypub_federation::config::Data, - local_user_id: uuid::Uuid, - ) -> anyhow::Result)>> { - let local_actor = get_local_actor(local_user_id, data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let followers = data.federation_repo.get_followers(local_user_id).await?; - let blocked = data - .federation_repo - .get_blocked_actors(local_user_id) - .await - .unwrap_or_default(); - let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); - let blocked_domains = data - .federation_repo - .get_blocked_domains() - .await - .unwrap_or_default(); - let blocked_domain_set: std::collections::HashSet = - blocked_domains.into_iter().map(|d| d.domain).collect(); - - let accepted: Vec<_> = followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .filter(|f| !blocked_set.contains(&f.actor.url)) - .filter(|f| { - let domain = url::Url::parse(&f.actor.inbox_url) - .ok() - .and_then(|u| u.host_str().map(|s| s.to_string())) - .unwrap_or_default(); - !blocked_domain_set.contains(&domain) - }) - .collect(); - - if accepted.is_empty() { - return Ok(None); - } - - Ok(Some((local_actor, collect_inboxes(&accepted)))) - } - - pub async fn actor_json(&self, user_id_str: &str) -> anyhow::Result { - use activitypub_federation::traits::Object; - let uuid = uuid::Uuid::parse_str(user_id_str)?; - let data = self.federation_config.to_request_data(); - let actor = get_local_actor(uuid, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - let person = actor - .into_json(&data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - Ok(serde_json::to_string(&WithContext::new_default(person))?) - } - - /// Returns the ActivityPub router compatible with any outer state `S`. - /// Handlers only use `Data` injected by the middleware layer, - /// so the router is independent of the application state type. - pub fn router(&self) -> Router - where - S: Clone + Send + Sync + 'static, - { - Router::new() - .route("/.well-known/nodeinfo", get(nodeinfo_well_known_handler)) - .route("/nodeinfo/2.0", get(nodeinfo_handler)) - .route("/.well-known/webfinger", get(webfinger_handler)) - .route("/inbox", post(inbox_handler)) - .route("/users/{id}/inbox", post(inbox_handler)) - .route("/users/{id}/outbox", get(outbox_handler)) - .layer(self.federation_config.middleware()) - } - - /// Fan out an Announce activity to all accepted followers. - pub async fn broadcast_announce_to_followers( - &self, - local_user_id: uuid::Uuid, - object_ap_id: url::Url, - ) -> anyhow::Result<()> { - // Deterministic ID so Undo(Announce) can reference this same activity. - let announce_id = url::Url::parse(&format!( - "{}/activities/announce/{}", - self.base_url, - uuid::Uuid::new_v5( - &uuid::Uuid::NAMESPACE_URL, - format!("{}/{}", local_user_id, object_ap_id).as_bytes(), - ) - )) - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, local_user_id).await? - else { - return Ok(()); - }; - - let announce = crate::activities::AnnounceActivity { - id: announce_id, - kind: Default::default(), - actor: activitypub_federation::fetch::object_id::ObjectId::from( - local_actor.ap_id.clone(), - ), - object: object_ap_id, - published: Some(chrono::Utc::now()), - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![local_actor.followers_url.to_string()], - }; - - let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( - &activitypub_federation::protocol::context::WithContext::new_default(announce), - &local_actor, - inboxes, - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some Announce deliveries failed"); - } - Ok(()) - } - - /// Fan out an Undo(Announce) activity to all accepted followers. - pub async fn broadcast_undo_announce_to_followers( - &self, - local_user_id: uuid::Uuid, - object_ap_id: url::Url, - ) -> anyhow::Result<()> { - // Reconstruct the same deterministic announce ID used when the boost was sent. - let announce_id = url::Url::parse(&format!( - "{}/activities/announce/{}", - self.base_url, - uuid::Uuid::new_v5( - &uuid::Uuid::NAMESPACE_URL, - format!("{}/{}", local_user_id, object_ap_id).as_bytes(), - ) - )) - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let undo_id = - crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, local_user_id).await? - else { - return Ok(()); - }; - - let undo = crate::activities::UndoActivity { - id: undo_id, - kind: Default::default(), - actor: activitypub_federation::fetch::object_id::ObjectId::from( - local_actor.ap_id.clone(), - ), - object: serde_json::json!({ - "type": "Announce", - "id": announce_id.to_string(), - "actor": local_actor.ap_id.to_string(), - "object": object_ap_id.to_string(), - }), - }; - - let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( - &activitypub_federation::protocol::context::WithContext::new_default(undo), - &local_actor, - inboxes, - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some Undo(Announce) deliveries failed" - ); - } - Ok(()) - } - - /// Send a Like activity to a single inbox. - pub async fn broadcast_like_to_inbox( - &self, - liker_user_id: uuid::Uuid, - object_ap_id: url::Url, - author_inbox_url: url::Url, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(liker_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - // Deterministic ID so Undo(Like) can reference the same activity. - let like_id = url::Url::parse(&format!( - "{}/activities/like/{}", - self.base_url, - uuid::Uuid::new_v5( - &uuid::Uuid::NAMESPACE_URL, - format!("{}/{}", liker_user_id, object_ap_id).as_bytes(), - ) - ))?; - - let like = crate::activities::LikeActivity { - id: like_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: object_ap_id, - }; - - let sends = SendActivityTask::prepare( - &WithContext::new_default(like), - &local_actor, - vec![author_inbox_url], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some Like deliveries failed permanently" - ); - } - Ok(()) - } - - /// Send an Undo(Like) activity to a single inbox. - pub async fn broadcast_undo_like_to_inbox( - &self, - liker_user_id: uuid::Uuid, - object_ap_id: url::Url, - author_inbox_url: url::Url, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(liker_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - // Reconstruct the same deterministic like ID. - let like_id = url::Url::parse(&format!( - "{}/activities/like/{}", - self.base_url, - uuid::Uuid::new_v5( - &uuid::Uuid::NAMESPACE_URL, - format!("{}/{}", liker_user_id, object_ap_id).as_bytes(), - ) - ))?; - - let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - - let undo = crate::activities::UndoActivity { - id: undo_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: serde_json::json!({ - "type": "Like", - "id": like_id.to_string(), - "actor": local_actor.ap_id.to_string(), - "object": object_ap_id.to_string(), - }), - }; - - let sends = SendActivityTask::prepare( - &WithContext::new_default(undo), - &local_actor, - vec![author_inbox_url], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some Undo(Like) deliveries failed permanently" - ); - } - Ok(()) - } - - /// Resolve a `@user@domain` handle to a `DbActor` over HTTPS directly. - /// The library's `webfinger_resolve_actor` tries HTTP first in debug mode, which breaks - /// on servers that don't redirect HTTP → HTTPS. - async fn webfinger_https( - handle: &str, - data: &activitypub_federation::config::Data, - ) -> anyhow::Result { - let normalized = handle.trim_start_matches('@'); - let at = normalized - .rfind('@') - .ok_or_else(|| anyhow::anyhow!("handle must be user@domain"))?; - let (user, domain_str) = (&normalized[..at], &normalized[at + 1..]); - let wf_url = format!( - "https://{}/.well-known/webfinger?resource=acct:{}@{}", - domain_str, user, domain_str - ); - let wf: serde_json::Value = reqwest::Client::new() - .get(&wf_url) - .header("Accept", "application/jrd+json, application/json") - .send() - .await? - .json() - .await?; - let self_href = wf["links"] - .as_array() - .and_then(|links| { - links.iter().find(|l| { - l["rel"].as_str() == Some("self") - && l["type"].as_str() == Some("application/activity+json") - }) - }) - .and_then(|l| l["href"].as_str()) - .ok_or_else(|| anyhow::anyhow!("no self link in WebFinger response"))? - .to_owned(); - let self_url = url::Url::parse(&self_href)?; - let actor: DbActor = ObjectId::from(self_url) - .dereference(data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - Ok(actor) - } - - pub async fn follow(&self, local_user_id: uuid::Uuid, handle: &str) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - - let normalized = handle.trim_start_matches('@'); - let parts: Vec<&str> = normalized.splitn(2, '@').collect(); - if parts.len() == 2 && parts[1] == data.domain { - return self.follow_local(local_user_id, parts[0], &data).await; - } - - let remote_actor: DbActor = Self::webfinger_https(handle, &data).await?; - - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let follow_id_str = follow_id.to_string(); - let follow = FollowActivity { - id: follow_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: ObjectId::from(remote_actor.ap_id.clone()), - }; - let follow_with_ctx = WithContext::new_default(follow); - - let sends = SendActivityTask::prepare( - &follow_with_ctx, - &local_actor, - vec![remote_actor.inbox()], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some activity deliveries failed permanently" - ); - } - - let domain = remote_actor.ap_id.host_str().unwrap_or(""); - let full_handle = format!("{}@{}", remote_actor.username, domain); - let remote = RemoteActor { - url: remote_actor.ap_id.to_string(), - handle: full_handle, - inbox_url: remote_actor.inbox_url.to_string(), - shared_inbox_url: remote_actor - .shared_inbox_url - .as_ref() - .map(|u| u.to_string()), - display_name: Some(remote_actor.username.clone()), - avatar_url: remote_actor.avatar_url.as_ref().map(|u| u.to_string()), - outbox_url: Some(remote_actor.outbox_url.to_string()), - }; - data.federation_repo - .add_following(local_user_id, remote, &follow_id_str) - .await?; - - Ok(()) - } - - pub async fn unfollow( - &self, - local_user_id: uuid::Uuid, - actor_url_str: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - - if actor_url_str.starts_with(&self.base_url) { - return self - .unfollow_local(local_user_id, actor_url_str, &data) - .await; - } - - let remote = data - .federation_repo - .get_remote_actor(actor_url_str) - .await? - .ok_or_else(|| anyhow::anyhow!("remote actor not found: {}", actor_url_str))?; - - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let remote_ap_id = Url::parse(actor_url_str)?; - let inbox = Url::parse(&remote.inbox_url)?; - - let follow_activity_id_str = data - .federation_repo - .get_follow_activity_id(local_user_id, actor_url_str) - .await?; - let follow_id = match follow_activity_id_str { - Some(id) => Url::parse(&id)?, - None => activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, - }; - let follow = FollowActivity { - id: follow_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: ObjectId::from(remote_ap_id), - }; - - let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let undo = UndoActivity { - id: undo_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: serde_json::to_value(&follow).map_err(|e| anyhow::anyhow!("{e}"))?, - }; - - let sends = SendActivityTask::prepare( - &WithContext::new_default(undo), - &local_actor, - vec![inbox], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some activity deliveries failed permanently" - ); - } - - data.federation_repo - .remove_following(local_user_id, actor_url_str) - .await?; - - data.object_handler - .on_actor_removed(&Url::parse(actor_url_str)?) - .await?; - - Ok(()) - } - - pub async fn accept_follower( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let remote_actor = data - .federation_repo - .get_remote_actor(remote_actor_url) - .await? - .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; - - let follow_id_str = data - .federation_repo - .get_follower_follow_activity_id(local_user_id, remote_actor_url) - .await? - .ok_or_else(|| { - anyhow::anyhow!("follow activity id not found for {}", remote_actor_url) - })?; - let follow_id = Url::parse(&follow_id_str)?; - let follow = FollowActivity { - id: follow_id, - kind: Default::default(), - actor: ObjectId::from(Url::parse(remote_actor_url)?), - object: ObjectId::from(local_actor.ap_id.clone()), - }; - let accept = AcceptActivity { - id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: follow, - }; - - data.federation_repo - .update_follower_status(local_user_id, remote_actor_url, FollowerStatus::Accepted) - .await?; - - let inbox = Url::parse(&remote_actor.inbox_url)?; - let sends = SendActivityTask::prepare( - &WithContext::new_default(accept), - &local_actor, - vec![inbox.clone()], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - "failed to deliver Accept activity, but follower is marked accepted locally" - ); - } - - let target_inbox = remote_actor - .shared_inbox_url - .clone() - .unwrap_or_else(|| remote_actor.inbox_url.clone()); - self.spawn_backfill(local_user_id, target_inbox); - - Ok(()) - } - - pub async fn reject_follower( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let remote_actor = data - .federation_repo - .get_remote_actor(remote_actor_url) - .await? - .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; - - let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let follow = FollowActivity { - id: follow_id, - kind: Default::default(), - actor: ObjectId::from(Url::parse(remote_actor_url)?), - object: ObjectId::from(local_actor.ap_id.clone()), - }; - let reject = RejectActivity { - id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: follow, - }; - - let inbox = Url::parse(&remote_actor.inbox_url)?; - let sends = SendActivityTask::prepare( - &WithContext::new_default(reject), - &local_actor, - vec![inbox], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some activity deliveries failed permanently" - ); - } - - data.federation_repo - .remove_follower(local_user_id, remote_actor_url) - .await?; - - Ok(()) - } - - pub async fn get_pending_followers( - &self, - local_user_id: uuid::Uuid, - ) -> anyhow::Result> { - let data = self.federation_config.to_request_data(); - data.federation_repo - .get_pending_followers(local_user_id) - .await - } - - pub async fn get_accepted_followers( - &self, - local_user_id: uuid::Uuid, - ) -> anyhow::Result> { - let data = self.federation_config.to_request_data(); - let followers = data.federation_repo.get_followers(local_user_id).await?; - Ok(followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .map(|f| f.actor) - .collect()) - } - - pub async fn count_accepted_followers( - &self, - local_user_id: uuid::Uuid, - ) -> anyhow::Result { - let data = self.federation_config.to_request_data(); - let followers = data.federation_repo.get_followers(local_user_id).await?; - Ok(followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .count()) - } - - pub async fn get_following( - &self, - local_user_id: uuid::Uuid, - ) -> anyhow::Result> { - let data = self.federation_config.to_request_data(); - data.federation_repo.get_following(local_user_id).await - } - - pub async fn count_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result { - let data = self.federation_config.to_request_data(); - data.federation_repo.count_following(local_user_id).await - } - - pub async fn remove_follower( - &self, - local_user_id: uuid::Uuid, - actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - data.federation_repo - .remove_follower(local_user_id, actor_url) - .await - } - - /// Broadcast a Delete activity to all accepted followers for a removed review. - pub async fn broadcast_delete_to_followers( - &self, - local_user_id: uuid::Uuid, - ap_id: Url, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, local_user_id).await? - else { - return Ok(()); - }; - - let delete_id = - crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let delete = crate::activities::DeleteActivity { - id: delete_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: serde_json::json!(ap_id.to_string()), - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![local_actor.followers_url.to_string()], - }; - let delete_with_ctx = WithContext::new_default(delete); - let sends = - SendActivityTask::prepare(&delete_with_ctx, &local_actor, inboxes, &data).await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some delete activity deliveries failed" - ); - } - Ok(()) - } - - /// Broadcast an Add(WatchlistObject) activity to all accepted followers. - pub async fn broadcast_add_to_followers( - &self, - local_user_id: uuid::Uuid, - ap_id: Url, - object: serde_json::Value, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, local_user_id).await? - else { - return Ok(()); - }; - - let add = crate::activities::AddActivity { - id: ap_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object, - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![local_actor.followers_url.to_string()], - }; - let add_with_ctx = WithContext::new_default(add); - let sends = SendActivityTask::prepare(&add_with_ctx, &local_actor, inboxes, &data).await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some Add deliveries failed"); - } - Ok(()) - } - - /// Broadcast an Undo(Add) activity to all accepted followers. - pub async fn broadcast_undo_add_to_followers( - &self, - local_user_id: uuid::Uuid, - watchlist_entry_ap_id: Url, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, local_user_id).await? - else { - return Ok(()); - }; - - let undo_id = - crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let undo = crate::activities::UndoActivity { - id: undo_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: serde_json::json!({ - "type": "Add", - "id": watchlist_entry_ap_id.as_str(), - "object": { "id": watchlist_entry_ap_id.as_str() } - }), - }; - let undo_with_ctx = WithContext::new_default(undo); - let sends = SendActivityTask::prepare(&undo_with_ctx, &local_actor, inboxes, &data).await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some Undo(Add) deliveries failed"); - } - Ok(()) - } - - pub async fn broadcast_actor_update(&self, user_id: uuid::Uuid) -> anyhow::Result<()> { - use activitypub_federation::traits::Object; - - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let person = local_actor - .clone() - .into_json(&data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - // Wrap with @context so Mastodon's JSON-LD processor can resolve field names. - let person_json = serde_json::to_value(WithContext::new_default(person))?; - - let update_id = Url::parse(&format!( - "{}/activities/update/{}", - self.base_url, - uuid::Uuid::new_v4() - ))?; - - let update = UpdateActivity { - id: update_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: person_json, - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![local_actor.followers_url.to_string()], - }; - - let followers = data.federation_repo.get_followers(user_id).await?; - let accepted: Vec<_> = followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .collect(); - - if accepted.is_empty() { - tracing::info!(user_id = %user_id, "no accepted followers, skipping actor update broadcast"); - return Ok(()); - } - - let inboxes = collect_inboxes(&accepted); - tracing::info!( - user_id = %user_id, - follower_count = accepted.len(), - inbox_count = inboxes.len(), - inboxes = ?inboxes, - "broadcasting actor update" - ); - - let sends = SendActivityTask::prepare( - &WithContext::new_default(update), - &local_actor, - inboxes, - &data, - ) - .await?; - - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - return Err(anyhow::anyhow!( - "actor update delivery failed for {} inbox(es): {}", - failures.len(), - failures - .iter() - .map(|e| e.to_string()) - .collect::>() - .join("; ") - )); - } - tracing::info!(user_id = %user_id, "actor update broadcast complete"); - Ok(()) - } - - pub async fn block_actor( - &self, - local_user_id: uuid::Uuid, - actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - - data.federation_repo - .add_blocked_actor(local_user_id, actor_url) - .await?; - let _ = data - .federation_repo - .remove_follower(local_user_id, actor_url) - .await; - let _ = data - .federation_repo - .remove_following(local_user_id, actor_url) - .await; - - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - if let Ok(Some(remote_actor)) = data.federation_repo.get_remote_actor(actor_url).await { - let block_id = - crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let block = crate::activities::BlockActivity { - id: block_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: Url::parse(actor_url)?, - }; - let inbox = Url::parse(&remote_actor.inbox_url)?; - let sends = SendActivityTask::prepare( - &WithContext::new_default(block), - &local_actor, - vec![inbox], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(actor = %actor_url, "failed to deliver Block activity"); - } - } - - Ok(()) - } - - pub async fn unblock_actor( - &self, - local_user_id: uuid::Uuid, - actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - data.federation_repo - .remove_blocked_actor(local_user_id, actor_url) - .await - } - - pub async fn get_blocked_actors( - &self, - local_user_id: uuid::Uuid, - ) -> anyhow::Result> { - let data = self.federation_config.to_request_data(); - let actor_urls = data - .federation_repo - .get_blocked_actors(local_user_id) - .await?; - let mut actors = Vec::new(); - for url in actor_urls { - let actor = match data.federation_repo.get_remote_actor(&url).await { - Ok(Some(a)) => a, - _ => RemoteActor { - url: url.clone(), - handle: url.clone(), - inbox_url: url.clone(), - shared_inbox_url: None, - display_name: None, - avatar_url: None, - outbox_url: None, - }, - }; - actors.push(actor); - } - Ok(actors) - } - - pub async fn add_blocked_domain( - &self, - domain: &str, - reason: Option<&str>, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - data.federation_repo - .add_blocked_domain(domain, reason) - .await - } - - pub async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - data.federation_repo.remove_blocked_domain(domain).await - } - - pub async fn get_blocked_domains(&self) -> anyhow::Result> { - let data = self.federation_config.to_request_data(); - data.federation_repo.get_blocked_domains().await - } - - async fn follow_local( - &self, - local_user_id: uuid::Uuid, - target_username: &str, - data: &activitypub_federation::config::Data, - ) -> anyhow::Result<()> { - let target = data - .user_repo - .find_by_username(target_username) - .await? - .ok_or_else(|| anyhow::anyhow!("user not found: {}", target_username))?; - - if target.id == local_user_id { - return Err(anyhow::anyhow!("cannot follow yourself")); - } - - let follower_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); - let target_actor_url = crate::urls::actor_url(&self.base_url, target.id); - let target_inbox_url = format!("{}/inbox", target_actor_url); - let follow_id = activity_url(&self.base_url) - .map_err(|e| anyhow::anyhow!("{e}"))? - .to_string(); - - data.federation_repo - .add_follower( - target.id, - &follower_actor_url, - FollowerStatus::Accepted, - &follow_id, - ) - .await?; - - let target_as_remote = RemoteActor { - url: target_actor_url.to_string(), - handle: format!("{}@{}", target.username, data.domain), - inbox_url: target_inbox_url, - shared_inbox_url: None, - display_name: Some(target.username), - avatar_url: None, - outbox_url: None, - }; - data.federation_repo - .add_following(local_user_id, target_as_remote, &follow_id) - .await?; - - data.federation_repo - .update_following_status( - local_user_id, - target_actor_url.as_ref(), - FollowingStatus::Accepted, - ) - .await?; - - tracing::info!(follower = %local_user_id, followee = %target.id, "local follow"); - Ok(()) - } - - async fn unfollow_local( - &self, - local_user_id: uuid::Uuid, - target_actor_url: &str, - data: &activitypub_federation::config::Data, - ) -> anyhow::Result<()> { - let target_url = Url::parse(target_actor_url)?; - let target_user_id = crate::urls::extract_user_id_from_url(&target_url) - .ok_or_else(|| anyhow::anyhow!("invalid local actor URL: {}", target_actor_url))?; - - let local_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); - - data.federation_repo - .remove_follower(target_user_id, &local_actor_url) - .await?; - data.federation_repo - .remove_following(local_user_id, target_actor_url) - .await?; - - tracing::info!(follower = %local_user_id, followee = %target_user_id, "local unfollow"); - Ok(()) - } - - pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> { - let client = reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(HTTP_FETCH_TIMEOUT_SECS)) - .build()?; - let data = self.federation_config.to_request_data(); - let actor = url::Url::parse(actor_url)?; - - let root: serde_json::Value = client - .get(outbox_url) - .header("Accept", "application/activity+json") - .send() - .await? - .json() - .await?; - - let first = match root.get("first").and_then(|v| v.as_str()) { - Some(url) => url.to_string(), - None => { - tracing::debug!(outbox = %outbox_url, "outbox has no first page, nothing to backfill"); - return Ok(()); - } - }; - - let mut current_url = first; - let mut visited = std::collections::HashSet::new(); - - loop { - if !visited.insert(current_url.clone()) { - tracing::warn!(url = %current_url, "backfill: loop detected, stopping"); - break; - } - - let page: serde_json::Value = match client - .get(¤t_url) - .header("Accept", "application/activity+json") - .send() - .await - { - Ok(resp) => match resp.json().await { - Ok(v) => v, - Err(e) => { - tracing::error!(error = %e, url = %current_url, "backfill: failed to parse page JSON"); - break; - } - }, - Err(e) => { - tracing::error!(error = %e, url = %current_url, "backfill: HTTP request failed"); - break; - } - }; - - if let Some(items) = page.get("orderedItems").and_then(|v| v.as_array()) { - for item in items { - let activity_type = item.get("type").and_then(|v| v.as_str()).unwrap_or(""); - if activity_type != "Create" && activity_type != "Add" { - continue; - } - let object = match item.get("object") { - Some(o) if o.is_object() => o.clone(), - _ => continue, - }; - let ap_id = match object - .get("id") - .and_then(|v| v.as_str()) - .and_then(|s| url::Url::parse(s).ok()) - { - Some(u) => u, - None => continue, - }; - if let Err(e) = data.object_handler.on_create(&ap_id, &actor, object).await { - tracing::warn!(ap_id = %ap_id, error = %e, "backfill: failed to process item, skipping"); - } - } - } - - match page.get("next").and_then(|v| v.as_str()) { - Some(next) => current_url = next.to_string(), - None => break, - } - } - - tracing::info!(outbox = %outbox_url, pages = visited.len(), "backfill complete"); - Ok(()) - } - - fn adapter_actor_to_domain( - a: crate::repository::RemoteActor, - ) -> domain::models::remote_actor::RemoteActor { - domain::models::remote_actor::RemoteActor { - url: a.url, - handle: a.handle, - display_name: a.display_name, - avatar_url: a.avatar_url, - outbox_url: a.outbox_url, - last_fetched_at: chrono::Utc::now(), - bio: None, - banner_url: None, - also_known_as: None, - followers_url: None, - following_url: None, - attachment: vec![], - } - } - - fn spawn_backfill(&self, owner_user_id: uuid::Uuid, follower_inbox_url: String) { - let config = self.federation_config.clone(); - let base_url = self.base_url.clone(); - tokio::spawn(async move { - if let Err(e) = ActivityPubService::run_backfill( - config, - base_url, - owner_user_id, - follower_inbox_url, - ) - .await - { - tracing::warn!(error = %e, "backfill: task failed"); - } - }); - } - - async fn run_backfill( - config: ApFederationConfig, - base_url: String, - owner_user_id: uuid::Uuid, - follower_inbox_url: String, - ) -> anyhow::Result<()> { - const BATCH_SIZE: usize = 20; - - let data = config.to_request_data(); - let local_actor = get_local_actor(owner_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - let inbox = Url::parse(&follower_inbox_url)?; - - let mut objects = data - .object_handler - .get_local_objects_for_user(owner_user_id) - .await?; - objects.reverse(); // oldest first → chronological feed - - let total = objects.len(); - let mut success_count = 0usize; - let mut failure_count = 0usize; - - for chunk in objects.chunks(BATCH_SIZE) { - for (ap_id, object_json) in chunk { - // Use a stable Create activity ID derived from the object's ap_id - let create_id = Url::parse(&format!( - "{}/activities/create/{}", - base_url, - uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, ap_id.as_str().as_bytes()) - ))?; - - let create = CreateActivity { - id: create_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: object_json.clone(), - to: vec![], - cc: vec![], - bto: vec![], - bcc: vec![], - }; - - let sends = SendActivityTask::prepare( - &WithContext::new_default(create), - &local_actor, - vec![inbox.clone()], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if failures.is_empty() { - success_count += 1; - } else { - failure_count += 1; - } - } - tokio::time::sleep(std::time::Duration::from_millis(BATCH_FETCH_SLEEP_MS)).await; - } - - tracing::info!( - user_id = %owner_user_id, - follower = %follower_inbox_url, - sent = success_count, - failed = failure_count, - total = total, - "backfill complete" - ); - Ok(()) - } -} - -#[async_trait::async_trait] -impl crate::ap_ports::OutboundFederationPort for ActivityPubService { - // Actor identity (ap_id, followers_url) comes from federation config via get_local_actor. - // author_username is provided by the caller but not needed here. - async fn broadcast_create( - &self, - author_user_id: &domain::value_objects::UserId, - thought: &domain::models::thought::Thought, - _author_username: &str, - in_reply_to_url: Option<&str>, - ) -> Result<(), domain::errors::DomainError> { - let user_uuid = author_user_id.as_uuid(); - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, user_uuid) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))? - else { - return Ok(()); - }; - - let (ap_id, note) = - thought_note_json(thought, &local_actor, &self.base_url, in_reply_to_url) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - - let create = crate::activities::CreateActivity { - id: ap_id, - kind: Default::default(), - actor: activitypub_federation::fetch::object_id::ObjectId::from( - local_actor.ap_id.clone(), - ), - object: note, - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![local_actor.followers_url.to_string()], - bto: vec![], - bcc: vec![], - }; - let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( - &activitypub_federation::protocol::context::WithContext::new_default(create), - &local_actor, - inboxes, - &data, - ) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some Create deliveries failed"); - } - Ok(()) - } - - async fn broadcast_delete( - &self, - author_user_id: &domain::value_objects::UserId, - thought_ap_id: &str, - ) -> Result<(), domain::errors::DomainError> { - let user_uuid = author_user_id.as_uuid(); - let ap_id = url::Url::parse(thought_ap_id) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - self.broadcast_delete_to_followers(user_uuid, ap_id) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) - } - - // Actor identity (ap_id, followers_url) comes from federation config via get_local_actor. - // author_username is provided by the caller but not needed here. - async fn broadcast_update( - &self, - author_user_id: &domain::value_objects::UserId, - thought: &domain::models::thought::Thought, - _author_username: &str, - in_reply_to_url: Option<&str>, - ) -> Result<(), domain::errors::DomainError> { - let user_uuid = author_user_id.as_uuid(); - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, user_uuid) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))? - else { - return Ok(()); - }; - - let (_ap_id, note) = - thought_note_json(thought, &local_actor, &self.base_url, in_reply_to_url) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - - let update_id = url::Url::parse(&format!( - "{}/activities/update/{}", - self.base_url, - uuid::Uuid::new_v4() - )) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - let update = crate::activities::UpdateActivity { - id: update_id, - kind: Default::default(), - actor: activitypub_federation::fetch::object_id::ObjectId::from( - local_actor.ap_id.clone(), - ), - object: note, - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![local_actor.followers_url.to_string()], - }; - let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( - &activitypub_federation::protocol::context::WithContext::new_default(update), - &local_actor, - inboxes, - &data, - ) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some Update deliveries failed"); - } - Ok(()) - } - - async fn broadcast_announce( - &self, - booster_user_id: &domain::value_objects::UserId, - object_ap_id: &str, - ) -> Result<(), domain::errors::DomainError> { - let user_uuid = booster_user_id.as_uuid(); - let ap_id = url::Url::parse(object_ap_id) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - self.broadcast_announce_to_followers(user_uuid, ap_id) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) - } - - async fn broadcast_undo_announce( - &self, - booster_user_id: &domain::value_objects::UserId, - object_ap_id: &str, - ) -> Result<(), domain::errors::DomainError> { - let user_uuid = booster_user_id.as_uuid(); - let ap_id = url::Url::parse(object_ap_id) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - self.broadcast_undo_announce_to_followers(user_uuid, ap_id) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) - } - - async fn broadcast_like( - &self, - liker_user_id: &domain::value_objects::UserId, - object_ap_id: &str, - author_inbox_url: &str, - ) -> Result<(), domain::errors::DomainError> { - let object = url::Url::parse(object_ap_id) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - let inbox = url::Url::parse(author_inbox_url) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - self.broadcast_like_to_inbox(liker_user_id.as_uuid(), object, inbox) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) - } - - async fn broadcast_undo_like( - &self, - liker_user_id: &domain::value_objects::UserId, - object_ap_id: &str, - author_inbox_url: &str, - ) -> Result<(), domain::errors::DomainError> { - let object = url::Url::parse(object_ap_id) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - let inbox = url::Url::parse(author_inbox_url) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - self.broadcast_undo_like_to_inbox(liker_user_id.as_uuid(), object, inbox) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) - } - - async fn broadcast_actor_update( - &self, - user_id: &domain::value_objects::UserId, - ) -> Result<(), domain::errors::DomainError> { - self.broadcast_actor_update(user_id.as_uuid()) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) - } -} - -#[async_trait::async_trait] -impl domain::ports::FederationSchedulerPort for ActivityPubService { - async fn schedule_actor_posts_fetch( - &self, - actor_ap_url: &str, - outbox_url: &str, - ) -> Result<(), domain::errors::DomainError> { - let service = self.clone(); - let actor = actor_ap_url.to_string(); - let outbox = outbox_url.to_string(); - tokio::spawn(async move { - if let Err(e) = service.backfill_outbox(&outbox, &actor).await { - tracing::warn!(actor = %actor, error = %e, "posts backfill failed"); - } - }); - Ok(()) - } - - async fn schedule_connections_fetch( - &self, - actor_ap_url: &str, - collection_url: &str, - connection_type: &str, - page: u32, - ) -> Result<(), domain::errors::DomainError> { - // Only trigger a full fetch on page 1 to avoid redundant network traffic. - if page != 1 { - return Ok(()); - } - let service = self.clone(); - let actor = actor_ap_url.to_string(); - let collection = collection_url.to_string(); - let conn_type = connection_type.to_string(); - let connections_repo = self.connections_repo.clone(); - tokio::spawn(async move { - let client = match reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(HTTP_FETCH_TIMEOUT_SECS)) - .build() - { - Ok(c) => c, - Err(e) => { - tracing::warn!(error = %e, "connections fetch: failed to build client"); - return; - } - }; - - // Walk the AP collection, following first/next links. - let mut all_urls: Vec = Vec::new(); - let mut current_url: Option = Some(collection.clone()); - const MAX_ACTORS: usize = 500; - - while let Some(url) = current_url.take() { - let val: serde_json::Value = match client - .get(&url) - .header("Accept", "application/activity+json, application/ld+json") - .send() - .await - { - Ok(r) => match r.json().await { - Ok(v) => v, - Err(e) => { - tracing::warn!(error = %e, url = %url, "connections: parse error"); - break; - } - }, - Err(e) => { - tracing::warn!(error = %e, url = %url, "connections: HTTP error"); - break; - } - }; - - // OrderedCollection root — follow its `first` page. - if val["type"].as_str() == Some("OrderedCollection") { - current_url = val["first"].as_str().map(|s| s.to_string()); - continue; - } - - // Collect actor URLs from orderedItems (string or {id: ...}). - let empty = vec![]; - let items = val["orderedItems"].as_array().unwrap_or(&empty); - for item in items { - let actor_url = item.as_str().or_else(|| item["id"].as_str()).unwrap_or(""); - if !actor_url.is_empty() { - all_urls.push(actor_url.to_string()); - } - } - - if all_urls.len() >= MAX_ACTORS { - break; - } - current_url = val["next"].as_str().map(|s| s.to_string()); - if current_url.is_some() { - tokio::time::sleep(std::time::Duration::from_millis(BATCH_FETCH_SLEEP_MS)) - .await; - } - } - - if all_urls.is_empty() { - tracing::debug!(actor = %actor, connection_type = %conn_type, "connections: empty collection"); - return; - } - - // Resolve profiles and cache in pages of PAGE_SIZE. - const PAGE_SIZE: usize = 20; - for (idx, chunk) in all_urls.chunks(PAGE_SIZE).enumerate() { - let page_num = (idx + 1) as u32; - let chunk_urls: Vec = chunk.to_vec(); - let resolved = service.resolve_actor_profiles(chunk_urls).await; - if let Err(e) = connections_repo - .upsert_connections(&actor, &conn_type, page_num, &resolved) - .await - { - tracing::warn!(error = %e, "connections: upsert failed"); - } - } - - tracing::debug!( - actor = %actor, - connection_type = %conn_type, - count = all_urls.len(), - "connections fetch complete" - ); - }); - Ok(()) - } -} - -#[async_trait::async_trait] -impl domain::ports::FederationLookupPort for ActivityPubService { - async fn lookup_actor( - &self, - handle: &str, - ) -> Result { - use activitypub_federation::fetch::object_id::ObjectId; - - let normalized = handle.trim_start_matches('@'); - let at = normalized.rfind('@').ok_or_else(|| { - domain::errors::DomainError::InvalidInput("handle must be user@domain".into()) - })?; - let (user, domain_str) = (&normalized[..at], &normalized[at + 1..]); - - // Fetch WebFinger over HTTPS directly — the library's webfinger_resolve_actor - // tries HTTP first in debug mode, which fails on servers without HTTP→HTTPS redirect. - let wf_url = format!( - "https://{}/.well-known/webfinger?resource=acct:{}@{}", - domain_str, user, domain_str - ); - let wf: serde_json::Value = reqwest::Client::new() - .get(&wf_url) - .header("Accept", "application/jrd+json, application/json") - .send() - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))? - .json() - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; - - let self_href = wf["links"] - .as_array() - .and_then(|links| { - links.iter().find(|l| { - l["rel"].as_str() == Some("self") - && l["type"].as_str() == Some("application/activity+json") - }) - }) - .and_then(|l| l["href"].as_str()) - .ok_or(domain::errors::DomainError::NotFound)?; - - let self_url = url::Url::parse(self_href) - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; - - let data = self.federation_config.to_request_data(); - let actor: crate::actors::DbActor = ObjectId::from(self_url) - .dereference(&data) - .await - .map_err(|e: crate::error::Error| { - domain::errors::DomainError::ExternalService(e.to_string()) - })?; - - let domain_str = actor.ap_id.host_str().unwrap_or(""); - let full_handle = format!("{}@{}", actor.username, domain_str); - - Ok(domain::models::remote_actor::RemoteActor { - url: actor.ap_id.to_string(), - handle: full_handle, - display_name: Some(actor.username.clone()), - avatar_url: actor.avatar_url.as_ref().map(|u| u.to_string()), - last_fetched_at: actor.last_refreshed_at, - bio: actor.bio.clone(), - banner_url: actor.banner_url.as_ref().map(|u| u.to_string()), - also_known_as: actor.also_known_as.clone(), - outbox_url: Some(actor.outbox_url.to_string()), - followers_url: Some(actor.followers_url.to_string()), - following_url: Some(actor.following_url.to_string()), - attachment: actor - .attachment - .iter() - .map(|f| (f.name.clone(), f.value.clone())) - .collect(), - }) - } - - async fn actor_json( - &self, - user_id: &domain::value_objects::UserId, - ) -> Result { - ActivityPubService::actor_json(self, &user_id.as_uuid().to_string()) - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string())) - } - - async fn followers_collection_json( - &self, - user_id: &domain::value_objects::UserId, - page: Option, - ) -> Result { - let data = self.federation_config.to_request_data(); - let uuid = user_id.as_uuid(); - let collection_id = format!("{}/users/{}/followers", self.base_url, uuid); - let total = data - .federation_repo - .count_followers(uuid) - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; - let obj = if let Some(p) = page { - let p = p.max(1); - let offset = (p.saturating_sub(1) as usize) * crate::urls::AP_PAGE_SIZE; - let followers = data - .federation_repo - .get_followers_page(uuid, offset as u32, crate::urls::AP_PAGE_SIZE) - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; - let has_next = offset + followers.len() < total; - let items: Vec = followers.into_iter().map(|f| f.actor.url).collect(); - let mut obj = serde_json::json!({ - "@context": crate::urls::AP_CONTEXT, - "type": "OrderedCollectionPage", - "id": format!("{}?page={}", collection_id, p), - "partOf": collection_id, - "totalItems": total, - "orderedItems": items, - }); - if has_next { - obj["next"] = serde_json::json!(format!("{}?page={}", collection_id, p + 1)); - } - obj - } else { - serde_json::json!({ - "@context": crate::urls::AP_CONTEXT, - "type": "OrderedCollection", - "id": collection_id, - "totalItems": total, - "first": format!("{}?page=1", collection_id), - }) - }; - serde_json::to_string(&obj) - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string())) - } - - async fn following_collection_json( - &self, - user_id: &domain::value_objects::UserId, - page: Option, - ) -> Result { - let data = self.federation_config.to_request_data(); - let uuid = user_id.as_uuid(); - let collection_id = format!("{}/users/{}/following", self.base_url, uuid); - let total = data - .federation_repo - .count_following(uuid) - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; - let obj = if let Some(p) = page { - let p = p.max(1); - let offset = (p.saturating_sub(1) as usize) * crate::urls::AP_PAGE_SIZE; - let following = data - .federation_repo - .get_following_page(uuid, offset as u32, crate::urls::AP_PAGE_SIZE) - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; - let has_next = offset + following.len() < total; - let items: Vec = following.into_iter().map(|a| a.url).collect(); - let mut obj = serde_json::json!({ - "@context": crate::urls::AP_CONTEXT, - "type": "OrderedCollectionPage", - "id": format!("{}?page={}", collection_id, p), - "partOf": collection_id, - "totalItems": total, - "orderedItems": items, - }); - if has_next { - obj["next"] = serde_json::json!(format!("{}?page={}", collection_id, p + 1)); - } - obj - } else { - serde_json::json!({ - "@context": crate::urls::AP_CONTEXT, - "type": "OrderedCollection", - "id": collection_id, - "totalItems": total, - "first": format!("{}?page=1", collection_id), - }) - }; - serde_json::to_string(&obj) - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string())) - } -} - -#[async_trait::async_trait] -impl domain::ports::FederationFetchPort for ActivityPubService { - async fn fetch_outbox_page( - &self, - outbox_url: &str, - page: u32, - ) -> Result, domain::errors::DomainError> { - use chrono::DateTime; - - // Fetch the base outbox to find the real first-page URL. - // Mastodon uses ?page=true; other servers may use ?page=1 or a different param. - let client = reqwest::Client::new(); - let base: serde_json::Value = client - .get(outbox_url) - .header("Accept", "application/activity+json, application/ld+json") - .send() - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))? - .json() - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; - - // Prefer the `first` link from the OrderedCollection; fall back to ?page=1. - let url = base["first"] - .as_str() - .map(|s| s.to_string()) - .unwrap_or_else(|| format!("{}?page={}", outbox_url, page)); - - let resp: serde_json::Value = client - .get(&url) - .header("Accept", "application/activity+json, application/ld+json") - .send() - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))? - .json() - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; - - let empty = vec![]; - let items = resp["orderedItems"].as_array().unwrap_or(&empty); - - let notes = items - .iter() - .filter_map(|item| { - // Items are Create activities wrapping a Note, or Notes directly - let note = if item["type"].as_str() == Some("Create") { - &item["object"] - } else if item["type"].as_str() == Some("Note") { - item - } else { - return None; - }; - - // Only public notes - let to = note["to"].as_array()?; - let is_public = to - .iter() - .any(|t| t.as_str() == Some("https://www.w3.org/ns/activitystreams#Public")); - if !is_public { - return None; - } - - let published = DateTime::parse_from_rfc3339(note["published"].as_str()?) - .ok()? - .with_timezone(&chrono::Utc); - - let text = note["content"].as_str().unwrap_or("").to_string(); - let has_attachments = note["attachment"] - .as_array() - .map(|a| !a.is_empty()) - .unwrap_or(false); - - let content = if has_attachments { - let notice = - "

📎 Media attachment — not supported

"; - if text.is_empty() { - notice.to_string() - } else { - format!("{text}{notice}") - } - } else { - text - }; - - Some(domain::models::remote_note::RemoteNote { - ap_id: note["id"].as_str()?.to_string(), - content, - published, - sensitive: note["sensitive"].as_bool().unwrap_or(false), - content_warning: note["summary"].as_str().map(|s| s.to_string()), - }) - }) - .collect(); - - Ok(notes) - } - - async fn fetch_actor_urls_from_collection( - &self, - collection_url: &str, - ) -> Result, domain::errors::DomainError> { - let client = reqwest::Client::new(); - let base: serde_json::Value = client - .get(collection_url) - .header("Accept", "application/activity+json, application/ld+json") - .send() - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))? - .json() - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; - - // Base collections typically have no orderedItems — follow the `first` page link. - let page = if base["orderedItems"].is_null() { - if let Some(first_url) = base["first"].as_str() { - client - .get(first_url) - .header("Accept", "application/activity+json, application/ld+json") - .send() - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))? - .json() - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))? - } else { - base - } - } else { - base - }; - - let empty = vec![]; - let items = page["orderedItems"].as_array().unwrap_or(&empty); - Ok(items - .iter() - .filter_map(|v| v.as_str().map(|s| s.to_string())) - .collect()) - } - - async fn resolve_actor_profiles( - &self, - urls: Vec, - ) -> Vec { - use futures::future; - - async fn fetch_one( - url: String, - ) -> Option { - let resp: serde_json::Value = tokio::time::timeout( - std::time::Duration::from_secs(5), - reqwest::Client::new() - .get(&url) - .header("Accept", "application/activity+json") - .send(), - ) - .await - .ok()? - .ok()? - .json() - .await - .ok()?; - - let ap_url = resp["id"].as_str()?.to_string(); - let preferred_username = resp["preferredUsername"].as_str().unwrap_or("").to_string(); - let domain_str = url::Url::parse(&ap_url) - .ok() - .and_then(|u| u.host_str().map(|s| s.to_string())) - .unwrap_or_default(); - let handle = format!("{}@{}", preferred_username, domain_str); - let display_name = resp["name"].as_str().map(|s| s.to_string()); - let avatar_url = resp["icon"]["url"].as_str().map(|s| s.to_string()); - - Some( - domain::models::actor_connection_summary::ActorConnectionSummary { - url: ap_url, - handle, - display_name, - avatar_url, - }, - ) - } - - let futs: Vec<_> = urls.into_iter().map(fetch_one).collect(); - let results = future::join_all(futs).await; - - results - .into_iter() - .filter_map(|r| { - if r.is_none() { - tracing::warn!("failed to resolve actor profile (timeout or parse error)"); - } - r - }) - .collect() - } -} - -#[async_trait::async_trait] -impl domain::ports::FederationFollowPort for ActivityPubService { - async fn follow_remote( - &self, - local_user_id: &domain::value_objects::UserId, - handle: &str, - ) -> Result<(), domain::errors::DomainError> { - self.follow(local_user_id.as_uuid(), handle) - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string())) - } - - async fn unfollow_remote( - &self, - local_user_id: &domain::value_objects::UserId, - handle: &str, - ) -> Result<(), domain::errors::DomainError> { - let data = self.federation_config.to_request_data(); - let remote_actor: DbActor = Self::webfinger_https(handle, &data) - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; - let actor_url = remote_actor.ap_id.to_string(); - self.unfollow(local_user_id.as_uuid(), &actor_url) - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string())) - } - - async fn get_remote_following( - &self, - user_id: &domain::value_objects::UserId, - ) -> Result, domain::errors::DomainError> { - self.get_following(user_id.as_uuid()) - .await - .map(|v| v.into_iter().map(Self::adapter_actor_to_domain).collect()) - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string())) - } -} - -#[async_trait::async_trait] -impl domain::ports::FederationFollowRequestPort for ActivityPubService { - async fn get_pending_followers( - &self, - user_id: &domain::value_objects::UserId, - ) -> Result, domain::errors::DomainError> { - self.get_pending_followers(user_id.as_uuid()) - .await - .map(|v| v.into_iter().map(Self::adapter_actor_to_domain).collect()) - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string())) - } - - async fn accept_follow_request( - &self, - user_id: &domain::value_objects::UserId, - actor_url: &str, - ) -> Result<(), domain::errors::DomainError> { - self.accept_follower(user_id.as_uuid(), actor_url) - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string())) - } - - async fn reject_follow_request( - &self, - user_id: &domain::value_objects::UserId, - actor_url: &str, - ) -> Result<(), domain::errors::DomainError> { - self.reject_follower(user_id.as_uuid(), actor_url) - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string())) - } - - async fn get_remote_followers( - &self, - user_id: &domain::value_objects::UserId, - ) -> Result, domain::errors::DomainError> { - self.get_accepted_followers(user_id.as_uuid()) - .await - .map(|v| v.into_iter().map(Self::adapter_actor_to_domain).collect()) - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string())) - } - - async fn remove_remote_follower( - &self, - user_id: &domain::value_objects::UserId, - actor_url: &str, - ) -> Result<(), domain::errors::DomainError> { - self.remove_follower(user_id.as_uuid(), actor_url) - .await - .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string())) - } -} - -#[cfg(test)] -#[path = "tests/service.rs"] -mod tests; diff --git a/crates/adapters/activitypub-base/src/tests/actors.rs b/crates/adapters/activitypub-base/src/tests/actors.rs deleted file mode 100644 index 7f510c4..0000000 --- a/crates/adapters/activitypub-base/src/tests/actors.rs +++ /dev/null @@ -1,49 +0,0 @@ -use super::*; - -#[test] -fn person_serializes_with_enriched_fields() { - let person = Person { - kind: Default::default(), - id: "https://example.com/users/1" - .parse::() - .unwrap() - .into(), - preferred_username: "alice".to_string(), - inbox: "https://example.com/users/1/inbox".parse().unwrap(), - outbox: "https://example.com/users/1/outbox".parse().unwrap(), - followers: "https://example.com/users/1/followers".parse().unwrap(), - following: "https://example.com/users/1/following".parse().unwrap(), - public_key: PublicKey { - id: "https://example.com/users/1#main-key".to_string(), - owner: "https://example.com/users/1".parse().unwrap(), - public_key_pem: "pem".to_string(), - }, - name: Some("Alice".to_string()), - summary: Some("Bio text".to_string()), - icon: Some(ApImageObject { - kind: "Image".to_string(), - url: "https://example.com/images/avatars/1".parse().unwrap(), - }), - url: Some("https://example.com/u/alice".parse().unwrap()), - discoverable: Some(true), - manually_approves_followers: true, - updated: Some(Utc::now()), - endpoints: Some(Endpoints { - shared_inbox: "https://example.com/inbox".parse().unwrap(), - }), - image: None, - also_known_as: vec![], - attachment: vec![], - }; - let json = serde_json::to_value(&person).unwrap(); - assert_eq!(json["discoverable"], true); - assert_eq!(json["summary"], "Bio text"); - assert_eq!(json["icon"]["type"], "Image"); - assert_eq!(json["manuallyApprovesFollowers"], true); - assert!(json.get("updated").is_some()); - assert!(json.get("endpoints").is_some()); - assert_eq!( - json["endpoints"]["sharedInbox"], - "https://example.com/inbox" - ); -} diff --git a/crates/adapters/activitypub-base/src/tests/nodeinfo.rs b/crates/adapters/activitypub-base/src/tests/nodeinfo.rs deleted file mode 100644 index 898e1bf..0000000 --- a/crates/adapters/activitypub-base/src/tests/nodeinfo.rs +++ /dev/null @@ -1,40 +0,0 @@ -use super::*; - -#[test] -fn nodeinfo_well_known_serializes_correctly() { - let doc = NodeInfoWellKnown { - links: vec![NodeInfoLink { - rel: "http://nodeinfo.diaspora.software/ns/schema/2.0".to_string(), - href: "https://example.com/nodeinfo/2.0".to_string(), - }], - }; - let json = serde_json::to_value(&doc).unwrap(); - assert_eq!( - json["links"][0]["rel"], - "http://nodeinfo.diaspora.software/ns/schema/2.0" - ); - assert_eq!(json["links"][0]["href"], "https://example.com/nodeinfo/2.0"); -} - -#[test] -fn nodeinfo_serializes_camel_case() { - let doc = NodeInfo { - version: "2.0".to_string(), - software: NodeInfoSoftware { - name: "my-app".to_string(), - version: "0.1.0".to_string(), - }, - protocols: vec!["activitypub".to_string()], - usage: NodeInfoUsage { - users: NodeInfoUsers { total: 3 }, - local_posts: 42, - }, - open_registrations: false, - }; - let json = serde_json::to_value(&doc).unwrap(); - assert_eq!(json["version"], "2.0"); - assert_eq!(json["software"]["name"], "my-app"); - assert_eq!(json["usage"]["users"]["total"], 3); - assert_eq!(json["usage"]["localPosts"], 42); - assert_eq!(json["openRegistrations"], false); -} diff --git a/crates/adapters/activitypub-base/src/tests/service.rs b/crates/adapters/activitypub-base/src/tests/service.rs deleted file mode 100644 index 0f8b034..0000000 --- a/crates/adapters/activitypub-base/src/tests/service.rs +++ /dev/null @@ -1,75 +0,0 @@ -fn _assert_impl_federation_lookup_port() -where - crate::service::ActivityPubService: domain::ports::FederationLookupPort, -{ -} - -fn _assert_impl_federation_follow_port() -where - crate::service::ActivityPubService: domain::ports::FederationFollowPort, -{ -} - -fn _assert_impl_federation_follow_request_port() -where - crate::service::ActivityPubService: domain::ports::FederationFollowRequestPort, -{ -} - -fn _assert_impl_federation_fetch_port() -where - crate::service::ActivityPubService: domain::ports::FederationFetchPort, -{ -} - -fn _assert_impl_federation_action_port() -where - crate::service::ActivityPubService: domain::ports::FederationActionPort, -{ -} - -use super::*; -use crate::repository::{Follower, FollowerStatus, RemoteActor}; - -fn make_follower(inbox: &str, shared: Option<&str>) -> Follower { - Follower { - actor: RemoteActor { - url: format!("https://remote/{}", inbox), - handle: "user".to_string(), - inbox_url: inbox.to_string(), - shared_inbox_url: shared.map(|s| s.to_string()), - display_name: None, - avatar_url: None, - outbox_url: None, - }, - status: FollowerStatus::Accepted, - } -} - -#[test] -fn collect_inboxes_deduplicates_shared() { - let followers = vec![ - make_follower( - "https://mastodon.social/users/a/inbox", - Some("https://mastodon.social/inbox"), - ), - make_follower( - "https://mastodon.social/users/b/inbox", - Some("https://mastodon.social/inbox"), - ), - make_follower("https://other.instance/users/c/inbox", None), - ]; - let inboxes = collect_inboxes(&followers); - assert_eq!(inboxes.len(), 2); - let strs: Vec<_> = inboxes.iter().map(|u| u.as_str()).collect(); - assert!(strs.contains(&"https://mastodon.social/inbox")); - assert!(strs.contains(&"https://other.instance/users/c/inbox")); -} - -#[test] -fn collect_inboxes_falls_back_to_individual_inbox() { - let followers = vec![make_follower("https://example.com/users/x/inbox", None)]; - let inboxes = collect_inboxes(&followers); - assert_eq!(inboxes.len(), 1); - assert_eq!(inboxes[0].as_str(), "https://example.com/users/x/inbox"); -} diff --git a/crates/adapters/activitypub-base/src/urls.rs b/crates/adapters/activitypub-base/src/urls.rs deleted file mode 100644 index 36bf9c8..0000000 --- a/crates/adapters/activitypub-base/src/urls.rs +++ /dev/null @@ -1,33 +0,0 @@ -use url::Url; - -use crate::error::Error; - -pub const AS_PUBLIC: &str = "https://www.w3.org/ns/activitystreams#Public"; -pub const AP_CONTEXT: &str = "https://www.w3.org/ns/activitystreams"; -pub const AP_PAGE_SIZE: usize = 20; - -pub fn extract_user_id_from_url(url: &Url) -> Option { - let path = url.path(); - path.strip_prefix("/users/") - .and_then(|s| s.split('/').next()) - .and_then(|s| uuid::Uuid::parse_str(s).ok()) -} - -pub fn activity_url(base_url: &str) -> Result { - Url::parse(&format!("{}/activities/{}", base_url, uuid::Uuid::new_v4())) - .map_err(|e| Error::bad_request(anyhow::anyhow!(e))) -} - -pub fn actor_url(base_url: &str, user_id: uuid::Uuid) -> Url { - Url::parse(&format!("{}/users/{}", base_url, user_id)) - .expect("base_url is always a valid URL prefix") -} - -/// Extract the username segment from a /users/:username URL. -#[allow(dead_code)] -pub fn extract_username_from_url(url: &Url) -> Option { - url.path() - .strip_prefix("/users/") - .and_then(|s| s.split('/').next()) - .map(|s| s.to_string()) -} diff --git a/crates/adapters/activitypub-base/src/user.rs b/crates/adapters/activitypub-base/src/user.rs deleted file mode 100644 index a99092b..0000000 --- a/crates/adapters/activitypub-base/src/user.rs +++ /dev/null @@ -1,27 +0,0 @@ -use async_trait::async_trait; -use url::Url; - -#[derive(Debug, Clone)] -pub struct ApProfileField { - pub name: String, - pub value: String, -} - -#[derive(Debug, Clone)] -pub struct ApUser { - pub id: uuid::Uuid, - pub username: String, - pub bio: Option, - pub avatar_url: Option, - pub banner_url: Option, - pub also_known_as: Option, - pub profile_url: Option, - pub attachment: Vec, -} - -#[async_trait] -pub trait ApUserRepository: Send + Sync { - async fn find_by_id(&self, id: uuid::Uuid) -> anyhow::Result>; - async fn find_by_username(&self, username: &str) -> anyhow::Result>; - async fn count_users(&self) -> anyhow::Result; -} diff --git a/crates/adapters/activitypub-base/src/webfinger.rs b/crates/adapters/activitypub-base/src/webfinger.rs deleted file mode 100644 index 8754287..0000000 --- a/crates/adapters/activitypub-base/src/webfinger.rs +++ /dev/null @@ -1,38 +0,0 @@ -use activitypub_federation::{ - config::Data, - fetch::webfinger::{Webfinger, build_webfinger_response, extract_webfinger_name}, -}; -use axum::{ - extract::Query, - http::header, - response::{IntoResponse, Response}, -}; -use serde::Deserialize; - -use crate::data::FederationData; -use crate::error::Error; - -#[derive(Deserialize)] -pub struct WebfingerQuery { - resource: String, -} - -pub async fn webfinger_handler( - Query(query): Query, - data: Data, -) -> Result { - let name = extract_webfinger_name(&query.resource, &data)?; - - let user = data - .user_repo - .find_by_username(name) - .await - .map_err(Error::from)? - .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; - - let ap_id = crate::urls::actor_url(&data.base_url, user.id); - - let wf: Webfinger = build_webfinger_response(query.resource, ap_id); - let body = serde_json::to_string(&wf).map_err(|e| Error::from(anyhow::anyhow!(e)))?; - Ok(([(header::CONTENT_TYPE, "application/jrd+json")], body).into_response()) -} diff --git a/crates/adapters/activitypub/src/lib.rs b/crates/adapters/activitypub/src/lib.rs index 5f8d6bd..13ef1d1 100644 --- a/crates/adapters/activitypub/src/lib.rs +++ b/crates/adapters/activitypub/src/lib.rs @@ -1,7 +1,9 @@ pub mod handler; pub mod note; +pub mod port; pub mod urls; pub use handler::ThoughtsObjectHandler; pub use note::ThoughtNote; +pub use port::{AcceptNoteInput, ActivityPubRepository, ActorApUrls, OutboundFederationPort, OutboxEntry}; pub use urls::ThoughtsUrls; diff --git a/crates/adapters/activitypub-base/src/ap_ports.rs b/crates/adapters/activitypub/src/port.rs similarity index 100% rename from crates/adapters/activitypub-base/src/ap_ports.rs rename to crates/adapters/activitypub/src/port.rs