From ebc612a311378bea87f32c7beadbc1d274ac2984 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 10:15:34 +0200 Subject: [PATCH] feat(activitypub-base): copy from movies-diary with username-based actor URLs --- Cargo.toml | 4 +- crates/adapters/activitypub-base/Cargo.toml | 19 +- .../activitypub-base/src/activities.rs | 615 +++++++++ .../activitypub-base/src/actor_handler.rs | 25 + .../adapters/activitypub-base/src/actors.rs | 327 +++++ .../adapters/activitypub-base/src/content.rs | 47 + crates/adapters/activitypub-base/src/data.rs | 48 + crates/adapters/activitypub-base/src/error.rs | 48 + .../activitypub-base/src/federation.rs | 50 + .../activitypub-base/src/followers_handler.rs | 130 ++ crates/adapters/activitypub-base/src/inbox.rs | 18 + crates/adapters/activitypub-base/src/lib.rs | 27 + .../adapters/activitypub-base/src/nodeinfo.rs | 80 ++ .../adapters/activitypub-base/src/outbox.rs | 138 ++ .../activitypub-base/src/repository.rs | 134 ++ .../adapters/activitypub-base/src/service.rs | 1221 +++++++++++++++++ .../activitypub-base/src/tests/actors.rs | 49 + .../activitypub-base/src/tests/nodeinfo.rs | 40 + .../activitypub-base/src/tests/service.rs | 45 + crates/adapters/activitypub-base/src/urls.rs | 30 + crates/adapters/activitypub-base/src/user.rs | 27 + .../activitypub-base/src/webfinger.rs | 38 + 22 files changed, 3158 insertions(+), 2 deletions(-) create mode 100644 crates/adapters/activitypub-base/src/activities.rs create mode 100644 crates/adapters/activitypub-base/src/actor_handler.rs create mode 100644 crates/adapters/activitypub-base/src/actors.rs create mode 100644 crates/adapters/activitypub-base/src/content.rs create mode 100644 crates/adapters/activitypub-base/src/data.rs create mode 100644 crates/adapters/activitypub-base/src/error.rs create mode 100644 crates/adapters/activitypub-base/src/federation.rs create mode 100644 crates/adapters/activitypub-base/src/followers_handler.rs create mode 100644 crates/adapters/activitypub-base/src/inbox.rs create mode 100644 crates/adapters/activitypub-base/src/nodeinfo.rs create mode 100644 crates/adapters/activitypub-base/src/outbox.rs create mode 100644 crates/adapters/activitypub-base/src/repository.rs create mode 100644 crates/adapters/activitypub-base/src/service.rs create mode 100644 crates/adapters/activitypub-base/src/tests/actors.rs create mode 100644 crates/adapters/activitypub-base/src/tests/nodeinfo.rs create mode 100644 crates/adapters/activitypub-base/src/tests/service.rs create mode 100644 crates/adapters/activitypub-base/src/urls.rs create mode 100644 crates/adapters/activitypub-base/src/user.rs create mode 100644 crates/adapters/activitypub-base/src/webfinger.rs diff --git a/Cargo.toml b/Cargo.toml index 677432e..6fd391c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ thiserror = "2.0" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } async-trait = "0.1" -uuid = { version = "1.0", features = ["v4", "serde"] } +uuid = { version = "1.0", features = ["v4", "v5", "serde"] } chrono = { version = "0.4", features = ["serde"] } sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "uuid", "chrono", "macros"] } axum = { version = "0.8", features = ["macros"] } @@ -35,6 +35,8 @@ futures = "0.3" dotenvy = "0.15" async-nats = "0.38" async-stream = "0.3" +reqwest = { version = "0.13", features = ["json"] } +url = { version = "2", features = ["serde"] } domain = { path = "crates/domain" } application = { path = "crates/application" } diff --git a/crates/adapters/activitypub-base/Cargo.toml b/crates/adapters/activitypub-base/Cargo.toml index 9cf7bf7..e195664 100644 --- a/crates/adapters/activitypub-base/Cargo.toml +++ b/crates/adapters/activitypub-base/Cargo.toml @@ -1,4 +1,21 @@ [package] name = "activitypub-base" version = "0.1.0" -edition = "2021" +edition = "2024" + +[dependencies] +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +anyhow = { workspace = true } +tracing = { workspace = true } +async-trait = { workspace = true } +axum = { workspace = true } +reqwest = { workspace = true } +url = { workspace = true } +domain = { workspace = true } + +activitypub_federation = "0.7.0-beta.11" +enum_delegate = "0.2" diff --git a/crates/adapters/activitypub-base/src/activities.rs b/crates/adapters/activitypub-base/src/activities.rs new file mode 100644 index 0000000..a055a63 --- /dev/null +++ b/crates/adapters/activitypub-base/src/activities.rs @@ -0,0 +1,615 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + kinds::activity::{ + AcceptType, CreateType, DeleteType, FollowType, RejectType, UndoType, UpdateType, + }, + traits::Activity, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[serde(rename = "Announce")] +pub struct AnnounceType; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; +use crate::repository::{FollowerStatus, FollowingStatus}; + +// --- Follow --- + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct FollowActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: FollowType, + pub(crate) actor: ObjectId, + pub(crate) object: ObjectId, +} + +#[async_trait::async_trait] +impl Activity for FollowActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, data: &Data) -> Result<(), Self::Error> { + let target_url = self.object.inner(); + let target_domain = match (target_url.host_str(), target_url.port()) { + (Some(host), Some(port)) => format!("{}:{}", host, port), + (Some(host), None) => host.to_string(), + _ => { + return Err(Error::bad_request(anyhow::anyhow!( + "invalid follow target URL" + ))); + } + }; + if target_domain != data.domain { + return Err(Error::bad_request(anyhow::anyhow!( + "follow target is not a local actor" + ))); + } + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let domain = self.actor().host_str().unwrap_or(""); + if data.federation_repo.is_domain_blocked(domain).await? { + tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); + return Ok(()); + } + let _follower = self.actor.dereference(data).await?; + let local_actor = self.object.dereference(data).await?; + + if data + .federation_repo + .is_actor_blocked(local_actor.user_id, self.actor.inner().as_str()) + .await? + { + tracing::info!(actor = %self.actor.inner(), "ignoring follow from blocked actor"); + return Ok(()); + } + + data.federation_repo + .add_follower( + local_actor.user_id, + self.actor.inner().as_str(), + FollowerStatus::Pending, + self.id.as_str(), + ) + .await?; + + tracing::info!( + follower = %self.actor.inner(), + local_user = %local_actor.user_id, + "follow request pending approval" + ); + Ok(()) + } +} + +// --- Accept --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AcceptActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: AcceptType, + pub(crate) actor: ObjectId, + pub(crate) object: FollowActivity, +} + +#[async_trait::async_trait] +impl Activity for AcceptActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let domain = self.actor().host_str().unwrap_or(""); + if data.federation_repo.is_domain_blocked(domain).await? { + tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); + return Ok(()); + } + let local_user_id = crate::urls::extract_user_id_from_url(self.object.actor.inner()) + .ok_or_else(|| Error::bad_request(anyhow::anyhow!("invalid actor URL in Follow")))?; + data.federation_repo + .update_following_status( + local_user_id, + self.actor.inner().as_str(), + FollowingStatus::Accepted, + ) + .await?; + + tracing::info!(remote_actor = %self.actor.inner(), "follow accepted by remote"); + Ok(()) + } +} + +// --- Reject --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct RejectActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: RejectType, + pub(crate) actor: ObjectId, + pub(crate) object: FollowActivity, +} + +#[async_trait::async_trait] +impl Activity for RejectActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let domain = self.actor().host_str().unwrap_or(""); + if data.federation_repo.is_domain_blocked(domain).await? { + tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); + return Ok(()); + } + if let Some(user_id) = crate::urls::extract_user_id_from_url(self.object.actor.inner()) { + data.federation_repo + .remove_following(user_id, self.actor.inner().as_str()) + .await?; + } + tracing::info!(actor = %self.actor.inner(), "follow rejected"); + Ok(()) + } +} + +// --- Undo --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UndoActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: UndoType, + pub(crate) actor: ObjectId, + pub(crate) object: serde_json::Value, +} + +#[async_trait::async_trait] +impl Activity for UndoActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let domain = self.actor().host_str().unwrap_or(""); + if data.federation_repo.is_domain_blocked(domain).await? { + tracing::info!(actor = %self.actor(), "ignoring Undo from blocked domain"); + return Ok(()); + } + + let obj_type = self + .object + .get("type") + .and_then(|t| t.as_str()) + .unwrap_or(""); + + match obj_type { + "Follow" => { + if let Some(obj_url) = self.object.get("object").and_then(|o| o.as_str()) { + if let Ok(url) = Url::parse(obj_url) { + if let Some(user_id) = crate::urls::extract_user_id_from_url(&url) { + data.federation_repo + .remove_follower(user_id, self.actor.inner().as_str()) + .await?; + } + } + } + data.object_handler + .on_actor_removed(self.actor.inner()) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(actor = %self.actor.inner(), "unfollowed"); + } + "Add" => { + let ap_id_str = self + .object + .get("object") + .and_then(|o| o.get("id")) + .and_then(|id| id.as_str()) + .or_else(|| self.object.get("id").and_then(|id| id.as_str())); + + if let Some(ap_id_str) = ap_id_str { + if let Ok(ap_id) = Url::parse(ap_id_str) { + data.object_handler + .on_delete(&ap_id, self.actor.inner()) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(ap_id = %ap_id_str, "undo Add (watchlist remove)"); + } + } + } + other => { + tracing::debug!(kind = %other, "ignoring Undo of unknown activity type"); + } + } + + Ok(()) + } +} + +// --- Create --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: CreateType, + pub(crate) actor: ObjectId, + pub(crate) object: serde_json::Value, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) to: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) cc: Vec, +} + +#[async_trait::async_trait] +impl Activity for CreateActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let domain = self.actor().host_str().unwrap_or(""); + if data.federation_repo.is_domain_blocked(domain).await? { + tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); + return Ok(()); + } + let ap_id = self.id.clone(); + let actor_url = self.actor.inner().clone(); + data.object_handler + .on_create(&ap_id, &actor_url, self.object) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(actor = %actor_url, "received create activity"); + Ok(()) + } +} + +// --- Delete --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DeleteActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: DeleteType, + pub(crate) actor: ObjectId, + pub(crate) object: Url, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) to: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) cc: Vec, +} + +#[async_trait::async_trait] +impl Activity for DeleteActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let domain = self.actor().host_str().unwrap_or(""); + if data.federation_repo.is_domain_blocked(domain).await? { + tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); + return Ok(()); + } + let actor_url = self.actor.inner().clone(); + data.object_handler + .on_delete(&self.object, &actor_url) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(object = %self.object, "received delete activity"); + Ok(()) + } +} + +// --- Update --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: UpdateType, + pub(crate) actor: ObjectId, + pub(crate) object: serde_json::Value, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) to: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) cc: Vec, +} + +#[async_trait::async_trait] +impl Activity for UpdateActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let domain = self.actor().host_str().unwrap_or(""); + if data.federation_repo.is_domain_blocked(domain).await? { + tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); + return Ok(()); + } + let ap_id = self.id.clone(); + let actor_url = self.actor.inner().clone(); + data.object_handler + .on_update(&ap_id, &actor_url, self.object) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(actor = %actor_url, "received update activity"); + Ok(()) + } +} + +// --- Announce --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AnnounceActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: AnnounceType, + pub(crate) actor: ObjectId, + pub(crate) object: Url, + pub(crate) published: Option>, +} + +#[async_trait::async_trait] +impl Activity for AnnounceActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let domain = self.actor().host_str().unwrap_or(""); + if data.federation_repo.is_domain_blocked(domain).await? { + tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); + return Ok(()); + } + let object_domain = self.object.host_str().unwrap_or(""); + if object_domain != data.domain { + return Ok(()); + } + data.federation_repo + .add_announce( + self.id.as_str(), + self.object.as_str(), + self.actor.inner().as_str(), + self.published.unwrap_or_else(chrono::Utc::now), + ) + .await?; + tracing::info!(actor = %self.actor.inner(), object = %self.object, "received announce"); + Ok(()) + } +} + +// --- Add --- + +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[serde(rename = "Add")] +pub struct AddType; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AddActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: AddType, + pub(crate) actor: ObjectId, + pub(crate) object: serde_json::Value, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) to: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) cc: Vec, +} + +#[async_trait::async_trait] +impl Activity for AddActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let domain = self.actor().host_str().unwrap_or(""); + if data.federation_repo.is_domain_blocked(domain).await? { + tracing::info!(actor = %self.actor(), "ignoring Add from blocked domain"); + return Ok(()); + } + let ap_id = self.id.clone(); + let actor_url = self.actor.inner().clone(); + data.object_handler + .on_create(&ap_id, &actor_url, self.object) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(actor = %actor_url, "received Add activity"); + Ok(()) + } +} + +// --- Block --- + +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[serde(rename = "Block")] +pub struct BlockType; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BlockActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: BlockType, + pub(crate) actor: ObjectId, + pub(crate) object: Url, +} + +#[async_trait::async_trait] +impl Activity for BlockActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let domain = self.actor().host_str().unwrap_or(""); + if data.federation_repo.is_domain_blocked(domain).await? { + tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); + return Ok(()); + } + // They blocked us — remove them from our following list + if let Some(local_user_id) = crate::urls::extract_user_id_from_url(&self.object) { + let _ = data + .federation_repo + .remove_following(local_user_id, self.actor.inner().as_str()) + .await; + } + tracing::info!(actor = %self.actor.inner(), "received block"); + Ok(()) + } +} + +// --- Inbox dispatch enum --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(tag = "type")] +#[enum_delegate::implement(Activity)] +pub enum InboxActivities { + #[serde(rename = "Follow")] + Follow(FollowActivity), + #[serde(rename = "Accept")] + Accept(AcceptActivity), + #[serde(rename = "Reject")] + Reject(RejectActivity), + #[serde(rename = "Undo")] + Undo(UndoActivity), + #[serde(rename = "Create")] + Create(CreateActivity), + #[serde(rename = "Delete")] + Delete(DeleteActivity), + #[serde(rename = "Update")] + Update(UpdateActivity), + #[serde(rename = "Announce")] + Announce(AnnounceActivity), + #[serde(rename = "Add")] + Add(AddActivity), + #[serde(rename = "Block")] + Block(BlockActivity), +} diff --git a/crates/adapters/activitypub-base/src/actor_handler.rs b/crates/adapters/activitypub-base/src/actor_handler.rs new file mode 100644 index 0000000..7030967 --- /dev/null +++ b/crates/adapters/activitypub-base/src/actor_handler.rs @@ -0,0 +1,25 @@ +use activitypub_federation::{ + axum::json::FederationJson, config::Data, protocol::context::WithContext, traits::Object, +}; +use axum::extract::Path; + +use crate::actors::{Person, get_local_actor}; +use crate::data::FederationData; +use crate::error::Error; + +pub async fn actor_handler( + Path(username): Path, + data: Data, +) -> Result>, Error> { + let ap_user = data + .user_repo + .find_by_username(&username) + .await + .map_err(Error::from)? + .ok_or_else(|| Error::bad_request(anyhow::anyhow!("user not found")))?; + + let db_actor = get_local_actor(ap_user.id, &data).await?; + let person = db_actor.into_json(&data).await?; + + Ok(FederationJson(WithContext::new_default(person))) +} diff --git a/crates/adapters/activitypub-base/src/actors.rs b/crates/adapters/activitypub-base/src/actors.rs new file mode 100644 index 0000000..01cd40d --- /dev/null +++ b/crates/adapters/activitypub-base/src/actors.rs @@ -0,0 +1,327 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + http_signatures::generate_actor_keypair, + kinds::actor::PersonType, + protocol::{public_key::PublicKey, verification::verify_domains_match}, + traits::{Actor, Object}, +}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::data::FederationData; +use crate::error::Error; +use crate::repository::RemoteActor; +use crate::user::ApProfileField; + +#[derive(Debug, Clone)] +pub struct DbActor { + pub user_id: uuid::Uuid, + pub username: String, + pub public_key_pem: String, + pub private_key_pem: Option, + pub inbox_url: Url, + pub outbox_url: Url, + pub followers_url: Url, + pub following_url: Url, + pub ap_id: Url, + pub last_refreshed_at: DateTime, + pub bio: Option, + pub avatar_url: Option, + pub banner_url: Option, + pub also_known_as: Option, + pub profile_url: Option, + pub attachment: Vec, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ApImageObject { + #[serde(rename = "type")] + pub kind: String, + pub url: Url, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Endpoints { + pub shared_inbox: Url, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProfileFieldObject { + #[serde(rename = "type")] + pub kind: String, + pub name: String, + pub value: String, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Person { + #[serde(rename = "type")] + kind: PersonType, + id: ObjectId, + preferred_username: String, + inbox: Url, + outbox: Url, + followers: Url, + following: Url, + public_key: PublicKey, + name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + summary: Option, + #[serde(skip_serializing_if = "Option::is_none")] + icon: Option, + #[serde(skip_serializing_if = "Option::is_none")] + url: Option, + #[serde(skip_serializing_if = "Option::is_none")] + discoverable: Option, + manually_approves_followers: bool, + #[serde(skip_serializing_if = "Option::is_none", default)] + updated: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + endpoints: Option, + #[serde(skip_serializing_if = "Option::is_none")] + image: Option, + #[serde(rename = "alsoKnownAs", skip_serializing_if = "Vec::is_empty", default)] + also_known_as: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + attachment: Vec, +} + +pub async fn get_local_actor( + user_id: uuid::Uuid, + data: &Data, +) -> Result { + let user = data + .user_repo + .find_by_id(user_id) + .await + .map_err(Error::from)? + .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found: {}", user_id)))?; + + let (public_key, private_key) = match data + .federation_repo + .get_local_actor_keypair(user_id) + .await? + { + Some(kp) => kp, + None => { + let kp = generate_actor_keypair()?; + data.federation_repo + .save_local_actor_keypair(user_id, kp.public_key.clone(), kp.private_key.clone()) + .await?; + (kp.public_key, kp.private_key) + } + }; + + let ap_id = crate::urls::actor_url(&data.base_url, user_id); + let inbox_url = Url::parse(&format!("{}/inbox", &ap_id)).expect("valid inbox url"); + let outbox_url = Url::parse(&format!("{}/outbox", &ap_id)).expect("valid outbox url"); + let followers_url = Url::parse(&format!("{}/followers", &ap_id)).expect("valid followers url"); + let following_url = Url::parse(&format!("{}/following", &ap_id)).expect("valid following url"); + + Ok(DbActor { + user_id, + username: user.username, + public_key_pem: public_key, + private_key_pem: Some(private_key), + inbox_url, + outbox_url, + followers_url, + following_url, + ap_id, + last_refreshed_at: Utc::now(), + bio: user.bio, + avatar_url: user.avatar_url, + banner_url: user.banner_url, + also_known_as: user.also_known_as, + profile_url: user.profile_url, + attachment: user.attachment, + }) +} + +#[async_trait::async_trait] +impl Object for DbActor { + type DataType = FederationData; + type Kind = Person; + type Error = Error; + + fn id(&self) -> &Url { + &self.ap_id + } + + fn last_refreshed_at(&self) -> Option> { + Some(self.last_refreshed_at) + } + + async fn read_from_id( + object_id: Url, + data: &Data, + ) -> Result, Self::Error> { + let user_id = match crate::urls::extract_user_id_from_url(&object_id) { + Some(id) => id, + None => return Ok(None), + }; + let user = match data.user_repo.find_by_id(user_id).await { + Ok(Some(u)) => u, + _ => return Ok(None), + }; + + let keypair = data + .federation_repo + .get_local_actor_keypair(user_id) + .await?; + + let (public_key, private_key) = match keypair { + Some(kp) => (kp.0, Some(kp.1)), + None => return Ok(None), + }; + + let ap_id = crate::urls::actor_url(&data.base_url, user_id); + let inbox_url = Url::parse(&format!("{}/inbox", &ap_id)).expect("valid url"); + let outbox_url = Url::parse(&format!("{}/outbox", &ap_id)).expect("valid url"); + let followers_url = Url::parse(&format!("{}/followers", &ap_id)).expect("valid url"); + let following_url = Url::parse(&format!("{}/following", &ap_id)).expect("valid url"); + + Ok(Some(DbActor { + user_id, + username: user.username, + public_key_pem: public_key, + private_key_pem: private_key, + inbox_url, + outbox_url, + followers_url, + following_url, + ap_id, + last_refreshed_at: Utc::now(), + bio: None, + avatar_url: None, + banner_url: None, + also_known_as: None, + profile_url: None, + attachment: vec![], + })) + } + + async fn into_json(self, data: &Data) -> Result { + let public_key = PublicKey { + id: format!("{}#main-key", &self.ap_id), + owner: self.ap_id.clone(), + public_key_pem: self.public_key_pem.clone(), + }; + + let icon = self.avatar_url.map(|url| ApImageObject { + kind: "Image".to_string(), + url, + }); + let image = self.banner_url.map(|url| ApImageObject { + kind: "Image".to_string(), + url, + }); + let profile_url = self.profile_url; + let also_known_as: Vec = self.also_known_as.into_iter().collect(); + let attachment: Vec = self + .attachment + .into_iter() + .map(|f| ProfileFieldObject { + kind: "PropertyValue".to_string(), + name: f.name, + value: f.value, + }) + .collect(); + + let shared_inbox = + Url::parse(&format!("{}/inbox", data.base_url)).expect("base_url is always valid"); + + Ok(Person { + kind: Default::default(), + id: self.ap_id.clone().into(), + preferred_username: self.username.clone(), + inbox: self.inbox_url.clone(), + outbox: self.outbox_url.clone(), + followers: self.followers_url.clone(), + following: self.following_url.clone(), + public_key, + name: Some(self.username.clone()), + summary: self.bio.clone(), + icon, + url: profile_url, + discoverable: Some(true), + manually_approves_followers: true, + updated: Some(self.last_refreshed_at), + endpoints: Some(Endpoints { shared_inbox }), + image, + also_known_as, + attachment, + }) + } + + async fn verify( + json: &Self::Kind, + expected_domain: &Url, + _data: &Data, + ) -> Result<(), Self::Error> { + verify_domains_match(json.id.inner(), expected_domain)?; + Ok(()) + } + + async fn from_json(json: Self::Kind, data: &Data) -> Result { + let actor = RemoteActor { + url: json.id.inner().to_string(), + handle: json.preferred_username.clone(), + inbox_url: json.inbox.to_string(), + shared_inbox_url: None, + display_name: json.name.clone(), + avatar_url: json.icon.as_ref().map(|i| i.url.to_string()), + outbox_url: Some(json.outbox.to_string()), + }; + data.federation_repo.upsert_remote_actor(actor).await?; + + let url_str = json.id.inner().to_string(); + let user_id = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, url_str.as_bytes()); + let ap_id = json.id.inner().clone(); + let inbox_url = json.inbox.clone(); + let outbox_url = json.outbox.clone(); + let followers_url = json.followers.clone(); + let following_url = json.following.clone(); + + Ok(DbActor { + user_id, + username: json.preferred_username.clone(), + public_key_pem: json.public_key.public_key_pem, + private_key_pem: None, + inbox_url, + outbox_url, + followers_url, + following_url, + ap_id, + last_refreshed_at: Utc::now(), + bio: None, + avatar_url: None, + banner_url: None, + also_known_as: None, + profile_url: None, + attachment: vec![], + }) + } +} + +impl Actor for DbActor { + fn public_key_pem(&self) -> &str { + &self.public_key_pem + } + + fn private_key_pem(&self) -> Option { + self.private_key_pem.clone() + } + + fn inbox(&self) -> Url { + self.inbox_url.clone() + } +} + +#[cfg(test)] +#[path = "tests/actors.rs"] +mod tests; diff --git a/crates/adapters/activitypub-base/src/content.rs b/crates/adapters/activitypub-base/src/content.rs new file mode 100644 index 0000000..83aad72 --- /dev/null +++ b/crates/adapters/activitypub-base/src/content.rs @@ -0,0 +1,47 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use url::Url; + +#[async_trait] +pub trait ApObjectHandler: Send + Sync { + /// Returns (ap_id, serialized object) for all local content owned by this user. + /// Used by outbox (count) and backfill (delivery). Must only return locally-authored content. + async fn get_local_objects_for_user( + &self, + user_id: uuid::Uuid, + ) -> anyhow::Result>; + + /// Returns up to `limit` objects ordered newest-first, published before `before`. + /// Returns (ap_id, object_json, published_at). + async fn get_local_objects_page( + &self, + user_id: uuid::Uuid, + before: Option>, + limit: usize, + ) -> anyhow::Result)>>; + + /// Incoming Create activity — persist remote content. + async fn on_create( + &self, + ap_id: &Url, + actor_url: &Url, + object: serde_json::Value, + ) -> anyhow::Result<()>; + + /// Incoming Update activity — update existing remote content. + async fn on_update( + &self, + ap_id: &Url, + actor_url: &Url, + object: serde_json::Value, + ) -> anyhow::Result<()>; + + /// Incoming Delete activity — remove specific remote content. + async fn on_delete(&self, ap_id: &Url, actor_url: &Url) -> anyhow::Result<()>; + + /// Actor unfollowed/was removed — clean up all their remote content. + async fn on_actor_removed(&self, actor_url: &Url) -> anyhow::Result<()>; + + /// Total number of locally-authored posts across all users. + async fn count_local_posts(&self) -> anyhow::Result; +} diff --git a/crates/adapters/activitypub-base/src/data.rs b/crates/adapters/activitypub-base/src/data.rs new file mode 100644 index 0000000..a4079f6 --- /dev/null +++ b/crates/adapters/activitypub-base/src/data.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; + +use crate::content::ApObjectHandler; +use crate::repository::FederationRepository; +use crate::user::ApUserRepository; +use domain::ports::EventPublisher; + +#[derive(Clone)] +pub struct FederationData { + pub(crate) federation_repo: Arc, + pub(crate) user_repo: Arc, + pub(crate) object_handler: Arc, + pub(crate) base_url: String, + pub(crate) domain: String, + pub(crate) allow_registration: bool, + pub(crate) software_name: String, + pub(crate) event_publisher: Option>, +} + +impl FederationData { + pub fn new( + federation_repo: Arc, + user_repo: Arc, + object_handler: Arc, + base_url: String, + allow_registration: bool, + software_name: String, + event_publisher: Option>, + ) -> Self { + let domain = base_url + .trim_start_matches("https://") + .trim_start_matches("http://") + .split('/') + .next() + .unwrap_or("") + .to_string(); + Self { + federation_repo, + user_repo, + object_handler, + base_url, + domain, + allow_registration, + software_name, + event_publisher, + } + } +} diff --git a/crates/adapters/activitypub-base/src/error.rs b/crates/adapters/activitypub-base/src/error.rs new file mode 100644 index 0000000..d631755 --- /dev/null +++ b/crates/adapters/activitypub-base/src/error.rs @@ -0,0 +1,48 @@ +use std::fmt::{Display, Formatter}; + +use axum::http::StatusCode; + +#[derive(Debug)] +pub struct Error(pub(crate) anyhow::Error, pub(crate) StatusCode); + +impl Error { + pub fn not_found(e: impl Into) -> Self { + Self(e.into(), StatusCode::NOT_FOUND) + } + + pub fn bad_request(e: impl Into) -> Self { + Self(e.into(), StatusCode::BAD_REQUEST) + } +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) + } +} + +impl From for Error +where + T: Into, +{ + fn from(t: T) -> Self { + Error(t.into(), StatusCode::INTERNAL_SERVER_ERROR) + } +} + +impl axum::response::IntoResponse for Error { + fn into_response(self) -> axum::response::Response { + let status = self.1; + if status.is_server_error() { + tracing::error!(error = %self.0, status = status.as_u16(), "federation error"); + } else { + tracing::debug!(error = %self.0, status = status.as_u16(), "federation response"); + } + let body = if status.is_server_error() { + "internal server error".to_string() + } else { + self.0.to_string() + }; + (status, body).into_response() + } +} diff --git a/crates/adapters/activitypub-base/src/federation.rs b/crates/adapters/activitypub-base/src/federation.rs new file mode 100644 index 0000000..5ccd975 --- /dev/null +++ b/crates/adapters/activitypub-base/src/federation.rs @@ -0,0 +1,50 @@ +use activitypub_federation::config::{Data, FederationConfig, FederationMiddleware, UrlVerifier}; +use activitypub_federation::error::Error as FedError; +use url::Url; + +use crate::data::FederationData; + +#[derive(Clone)] +struct PermissiveVerifier; + +#[async_trait::async_trait] +impl UrlVerifier for PermissiveVerifier { + async fn verify(&self, _url: &Url) -> Result<(), FedError> { + Ok(()) + } +} + +#[derive(Clone)] +pub struct ApFederationConfig(pub FederationConfig); + +impl ApFederationConfig { + pub async fn new(data: FederationData, debug: bool) -> anyhow::Result { + let config = if debug { + FederationConfig::builder() + .domain(&data.domain) + .app_data(data) + .debug(true) + .http_signature_compat(true) + .url_verifier(Box::new(PermissiveVerifier)) + .build() + .await? + } else { + FederationConfig::builder() + .domain(&data.domain) + .app_data(data) + .debug(false) + .http_signature_compat(true) + .build() + .await? + }; + Ok(Self(config)) + } + + pub fn to_request_data(&self) -> Data { + self.0.to_request_data() + } + + pub fn middleware(&self) -> FederationMiddleware { + FederationMiddleware::new(self.0.clone()) + } +} diff --git a/crates/adapters/activitypub-base/src/followers_handler.rs b/crates/adapters/activitypub-base/src/followers_handler.rs new file mode 100644 index 0000000..36b4800 --- /dev/null +++ b/crates/adapters/activitypub-base/src/followers_handler.rs @@ -0,0 +1,130 @@ +use activitypub_federation::{axum::json::FederationJson, config::Data}; +use axum::extract::{Path, Query}; +use serde::Deserialize; +use serde_json::json; + +use crate::data::FederationData; +use crate::error::Error; + +const PAGE_SIZE: usize = 20; + +#[derive(Deserialize)] +pub struct PageQuery { + page: Option, +} + +pub async fn followers_handler( + Path(user_id_str): Path, + Query(query): Query, + data: Data, +) -> Result, Error> { + let user_id = uuid::Uuid::parse_str(&user_id_str) + .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?; + + data.user_repo + .find_by_id(user_id) + .await + .map_err(Error::from)? + .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; + + let collection_id = format!("{}/users/{}/followers", data.base_url, user_id_str); + let total = data + .federation_repo + .count_followers(user_id) + .await + .map_err(Error::from)?; + + if let Some(page) = query.page { + let page = page.max(1); + let offset = (page.saturating_sub(1) as usize) * PAGE_SIZE; + let followers = data + .federation_repo + .get_followers_page(user_id, offset as u32, PAGE_SIZE) + .await + .map_err(Error::from)?; + + let has_next = offset + followers.len() < total; + let items: Vec = followers.into_iter().map(|f| f.actor.url).collect(); + + let mut obj = json!({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollectionPage", + "id": format!("{}?page={}", collection_id, page), + "partOf": collection_id, + "totalItems": total, + "orderedItems": items, + }); + + if has_next { + obj["next"] = json!(format!("{}?page={}", collection_id, page + 1)); + } + + Ok(FederationJson(obj)) + } else { + Ok(FederationJson(json!({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollection", + "id": collection_id, + "totalItems": total, + "first": format!("{}?page=1", collection_id), + }))) + } +} + +pub async fn following_handler( + Path(user_id_str): Path, + Query(query): Query, + data: Data, +) -> Result, Error> { + let user_id = uuid::Uuid::parse_str(&user_id_str) + .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?; + + data.user_repo + .find_by_id(user_id) + .await + .map_err(Error::from)? + .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; + + let collection_id = format!("{}/users/{}/following", data.base_url, user_id_str); + let total = data + .federation_repo + .count_following(user_id) + .await + .map_err(Error::from)?; + + if let Some(page) = query.page { + let page = page.max(1); + let offset = (page.saturating_sub(1) as usize) * PAGE_SIZE; + let following = data + .federation_repo + .get_following_page(user_id, offset as u32, PAGE_SIZE) + .await + .map_err(Error::from)?; + + let has_next = offset + following.len() < total; + let items: Vec = following.into_iter().map(|a| a.url).collect(); + + let mut obj = json!({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollectionPage", + "id": format!("{}?page={}", collection_id, page), + "partOf": collection_id, + "totalItems": total, + "orderedItems": items, + }); + + if has_next { + obj["next"] = json!(format!("{}?page={}", collection_id, page + 1)); + } + + Ok(FederationJson(obj)) + } else { + Ok(FederationJson(json!({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollection", + "id": collection_id, + "totalItems": total, + "first": format!("{}?page=1", collection_id), + }))) + } +} diff --git a/crates/adapters/activitypub-base/src/inbox.rs b/crates/adapters/activitypub-base/src/inbox.rs new file mode 100644 index 0000000..2f2d063 --- /dev/null +++ b/crates/adapters/activitypub-base/src/inbox.rs @@ -0,0 +1,18 @@ +use activitypub_federation::{ + axum::inbox::{ActivityData, receive_activity}, + config::Data, + protocol::context::WithContext, +}; + +use crate::activities::InboxActivities; +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; + +pub async fn inbox_handler( + data: Data, + activity_data: ActivityData, +) -> Result<(), Error> { + receive_activity::, DbActor, FederationData>(activity_data, &data) + .await +} diff --git a/crates/adapters/activitypub-base/src/lib.rs b/crates/adapters/activitypub-base/src/lib.rs index e69de29..2ac9c64 100644 --- a/crates/adapters/activitypub-base/src/lib.rs +++ b/crates/adapters/activitypub-base/src/lib.rs @@ -0,0 +1,27 @@ +pub mod activities; +pub mod actor_handler; +pub mod actors; +pub mod content; +pub mod data; +pub mod error; +pub mod federation; +pub mod followers_handler; +pub mod inbox; +pub mod nodeinfo; +pub mod outbox; +pub mod repository; +pub mod service; +pub(crate) mod urls; +pub use urls::AS_PUBLIC; +pub mod user; +pub mod webfinger; + +pub use content::ApObjectHandler; +pub use data::FederationData; +pub use error::Error; +pub use federation::ApFederationConfig; +pub use repository::{ + BlockedDomain, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, +}; +pub use service::ActivityPubService; +pub use user::{ApProfileField, ApUser, ApUserRepository}; diff --git a/crates/adapters/activitypub-base/src/nodeinfo.rs b/crates/adapters/activitypub-base/src/nodeinfo.rs new file mode 100644 index 0000000..1b95ae8 --- /dev/null +++ b/crates/adapters/activitypub-base/src/nodeinfo.rs @@ -0,0 +1,80 @@ +use activitypub_federation::config::Data; +use axum::Json; +use serde::Serialize; + +use crate::data::FederationData; +use crate::error::Error; + +#[derive(Serialize)] +pub struct NodeInfoWellKnown { + pub links: Vec, +} + +#[derive(Serialize)] +pub struct NodeInfoLink { + pub rel: String, + pub href: String, +} + +#[derive(Serialize)] +pub struct NodeInfoSoftware { + pub name: String, + pub version: String, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct NodeInfoUsage { + pub users: NodeInfoUsers, + pub local_posts: u64, +} + +#[derive(Serialize)] +pub struct NodeInfoUsers { + pub total: usize, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct NodeInfo { + pub version: String, + pub software: NodeInfoSoftware, + pub protocols: Vec, + pub usage: NodeInfoUsage, + pub open_registrations: bool, +} + +pub async fn nodeinfo_well_known_handler( + data: Data, +) -> Result, Error> { + let href = format!("{}/nodeinfo/2.0", data.base_url); + Ok(Json(NodeInfoWellKnown { + links: vec![NodeInfoLink { + rel: "http://nodeinfo.diaspora.software/ns/schema/2.0".to_string(), + href, + }], + })) +} + +pub async fn nodeinfo_handler(data: Data) -> Result, Error> { + let user_count = data.user_repo.count_users().await.unwrap_or(0); + let local_posts = data.object_handler.count_local_posts().await.unwrap_or(0); + + Ok(Json(NodeInfo { + version: "2.0".to_string(), + software: NodeInfoSoftware { + name: data.software_name.clone(), + version: env!("CARGO_PKG_VERSION").to_string(), + }, + protocols: vec!["activitypub".to_string()], + usage: NodeInfoUsage { + users: NodeInfoUsers { total: user_count }, + local_posts, + }, + open_registrations: data.allow_registration, + })) +} + +#[cfg(test)] +#[path = "tests/nodeinfo.rs"] +mod tests; diff --git a/crates/adapters/activitypub-base/src/outbox.rs b/crates/adapters/activitypub-base/src/outbox.rs new file mode 100644 index 0000000..d9a209e --- /dev/null +++ b/crates/adapters/activitypub-base/src/outbox.rs @@ -0,0 +1,138 @@ +use axum::extract::{Path, Query}; +use axum::response::IntoResponse; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use activitypub_federation::{ + config::Data, fetch::object_id::ObjectId, kinds::activity::CreateType, + protocol::context::WithContext, +}; + +use crate::{activities::CreateActivity, data::FederationData, error::Error}; + +const PAGE_SIZE: usize = 20; + +#[derive(Deserialize)] +pub struct OutboxQuery { + page: Option, + before: Option, +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OrderedCollection { + #[serde(rename = "@context")] + context: String, + #[serde(rename = "type")] + kind: String, + id: String, + total_items: u64, + first: String, +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OrderedCollectionPage { + #[serde(rename = "@context")] + context: String, + #[serde(rename = "type")] + kind: String, + id: String, + part_of: String, + ordered_items: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + next: Option, +} + +pub async fn outbox_handler( + Path(user_id_str): Path, + Query(query): Query, + data: Data, +) -> Result { + let uuid = uuid::Uuid::parse_str(&user_id_str) + .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?; + + data.user_repo + .find_by_id(uuid) + .await + .map_err(Error::from)? + .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; + + let outbox_url = format!("{}/users/{}/outbox", data.base_url, user_id_str); + + if query.page.unwrap_or(false) { + let before: Option> = query.before.as_deref().and_then(|s| s.parse().ok()); + + let items = data + .object_handler + .get_local_objects_page(uuid, before, PAGE_SIZE) + .await + .map_err(|e| Error::from(anyhow::anyhow!("{}", e)))?; + + let actor_url: Url = format!("{}/users/{}", data.base_url, user_id_str) + .parse() + .expect("valid url"); + + let has_more = items.len() == PAGE_SIZE; + let oldest_ts = items.last().map(|(_, _, ts)| *ts); + + let followers_url = format!("{}/followers", actor_url); + let ordered_items: Vec = items + .into_iter() + .map(|(ap_id, object, _)| { + let create_id = Url::parse(&format!("{}/activity", ap_id)).expect("valid url"); + serde_json::to_value(WithContext::new_default(CreateActivity { + id: create_id, + kind: CreateType::default(), + actor: ObjectId::from(actor_url.clone()), + object, + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![followers_url.clone()], + })) + .expect("serializable") + }) + .collect(); + + let page_id = match &query.before { + Some(b) => format!("{}?page=true&before={}", outbox_url, b), + None => format!("{}?page=true", outbox_url), + }; + + let next = if has_more { + oldest_ts.map(|ts| { + // Use RFC 3339 with Z suffix (no + sign) to avoid percent-encoding + let ts_str = ts.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); + format!("{}?page=true&before={}", outbox_url, ts_str) + }) + } else { + None + }; + + Ok(axum::Json(OrderedCollectionPage { + context: "https://www.w3.org/ns/activitystreams".to_string(), + kind: "OrderedCollectionPage".to_string(), + id: page_id, + part_of: outbox_url, + ordered_items, + next, + }) + .into_response()) + } else { + let total = data + .object_handler + .get_local_objects_for_user(uuid) + .await + .map_err(|e| Error::from(anyhow::anyhow!("{}", e)))? + .len() as u64; + + Ok(axum::Json(OrderedCollection { + context: "https://www.w3.org/ns/activitystreams".to_string(), + kind: "OrderedCollection".to_string(), + id: outbox_url.clone(), + total_items: total, + first: format!("{}?page=true", outbox_url), + }) + .into_response()) + } +} diff --git a/crates/adapters/activitypub-base/src/repository.rs b/crates/adapters/activitypub-base/src/repository.rs new file mode 100644 index 0000000..0aab3c2 --- /dev/null +++ b/crates/adapters/activitypub-base/src/repository.rs @@ -0,0 +1,134 @@ +use anyhow::Result; +use async_trait::async_trait; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FollowerStatus { + Pending, + Accepted, + Rejected, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FollowingStatus { + Pending, + Accepted, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RemoteActor { + pub url: String, + pub handle: String, + pub inbox_url: String, + pub shared_inbox_url: Option, + pub display_name: Option, + pub avatar_url: Option, + pub outbox_url: Option, +} + +#[derive(Debug, Clone)] +pub struct Follower { + pub actor: RemoteActor, + pub status: FollowerStatus, +} + +#[derive(Debug, Clone)] +pub struct BlockedDomain { + pub domain: String, + pub reason: Option, + pub blocked_at: String, +} + +#[async_trait] +pub trait FederationRepository: Send + Sync { + async fn add_follower( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowerStatus, + follow_activity_id: &str, + ) -> Result<()>; + async fn get_follower_follow_activity_id( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result>; + async fn remove_follower( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result<()>; + async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result>; + async fn get_followers_page( + &self, + local_user_id: uuid::Uuid, + offset: u32, + limit: usize, + ) -> Result>; + async fn count_followers(&self, local_user_id: uuid::Uuid) -> Result; + async fn get_following_page( + &self, + local_user_id: uuid::Uuid, + offset: u32, + limit: usize, + ) -> Result>; + async fn update_follower_status( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowerStatus, + ) -> Result<()>; + async fn add_following( + &self, + local_user_id: uuid::Uuid, + actor: RemoteActor, + follow_activity_id: &str, + ) -> Result<()>; + async fn get_follow_activity_id( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result>; + async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; + async fn get_following(&self, local_user_id: uuid::Uuid) -> Result>; + async fn count_following(&self, local_user_id: uuid::Uuid) -> Result; + async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()>; + async fn get_remote_actor(&self, actor_url: &str) -> Result>; + async fn get_local_actor_keypair( + &self, + user_id: uuid::Uuid, + ) -> Result>; + async fn save_local_actor_keypair( + &self, + user_id: uuid::Uuid, + public_key: String, + private_key: String, + ) -> Result<()>; + async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result>; + async fn update_following_status( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowingStatus, + ) -> Result<()>; + async fn get_following_outbox_url( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result>; + async fn add_announce( + &self, + activity_id: &str, + object_url: &str, + actor_url: &str, + announced_at: chrono::DateTime, + ) -> Result<()>; + async fn count_announces(&self, object_url: &str) -> Result; + async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> Result<()>; + async fn remove_blocked_domain(&self, domain: &str) -> Result<()>; + async fn get_blocked_domains(&self) -> Result>; + async fn is_domain_blocked(&self, domain: &str) -> Result; + async fn add_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; + async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; + async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result>; + async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result; +} diff --git a/crates/adapters/activitypub-base/src/service.rs b/crates/adapters/activitypub-base/src/service.rs new file mode 100644 index 0000000..a536187 --- /dev/null +++ b/crates/adapters/activitypub-base/src/service.rs @@ -0,0 +1,1221 @@ +use std::sync::Arc; + +use activitypub_federation::{ + activity_sending::SendActivityTask, + fetch::{object_id::ObjectId, webfinger::webfinger_resolve_actor}, + protocol::context::WithContext, + traits::Actor, +}; +use axum::{Router, routing::get, routing::post}; +use url::Url; + +use crate::{ + activities::{ + AcceptActivity, CreateActivity, FollowActivity, RejectActivity, UndoActivity, + UpdateActivity, + }, + actors::{DbActor, get_local_actor}, + content::ApObjectHandler, + data::FederationData, + federation::ApFederationConfig, + followers_handler::{followers_handler, following_handler}, + inbox::inbox_handler, + nodeinfo::{nodeinfo_handler, nodeinfo_well_known_handler}, + outbox::outbox_handler, + repository::{ + BlockedDomain, FederationRepository, FollowerStatus, FollowingStatus, RemoteActor, + }, + urls::activity_url, + user::ApUserRepository, + webfinger::webfinger_handler, +}; + +fn collect_inboxes(followers: &[crate::repository::Follower]) -> Vec { + let mut seen = std::collections::HashSet::new(); + let mut inboxes = Vec::new(); + for f in followers { + let inbox_str = f + .actor + .shared_inbox_url + .as_deref() + .unwrap_or(&f.actor.inbox_url); + if seen.insert(inbox_str.to_string()) + && let Ok(url) = Url::parse(inbox_str) + { + inboxes.push(url); + } + } + inboxes +} + +pub(crate) async fn send_with_retry( + sends: Vec, + data: &activitypub_federation::config::Data, +) -> Vec { + let mut failures = vec![]; + for send in sends { + let mut delay = std::time::Duration::from_secs(1); + for attempt in 1..=3u32 { + match send.clone().sign_and_send(data).await { + Ok(()) => break, + Err(e) if attempt < 3 => { + tracing::warn!(attempt, error = %e, "delivery failed, retrying"); + tokio::time::sleep(delay).await; + delay *= 2; + } + Err(e) => { + tracing::error!(attempt, error = %e, "delivery failed permanently"); + failures.push(anyhow::anyhow!(e)); + } + } + } + } + failures +} + +pub struct ActivityPubService { + federation_config: ApFederationConfig, + base_url: String, +} + +impl ActivityPubService { + pub async fn new( + repo: Arc, + user_repo: Arc, + object_handler: Arc, + base_url: String, + allow_registration: bool, + software_name: String, + debug: bool, + event_publisher: Option>, + ) -> anyhow::Result { + let data = FederationData::new( + repo, + user_repo, + object_handler, + base_url.clone(), + allow_registration, + software_name, + event_publisher, + ); + let federation_config = ApFederationConfig::new(data, debug).await?; + Ok(Self { + federation_config, + base_url, + }) + } + + pub fn federation_config(&self) -> &ApFederationConfig { + &self.federation_config + } + + pub fn request_data(&self) -> activitypub_federation::config::Data { + self.federation_config.to_request_data() + } + + pub async fn actor_json(&self, user_id_str: &str) -> anyhow::Result { + use activitypub_federation::traits::Object; + let uuid = uuid::Uuid::parse_str(user_id_str)?; + let data = self.federation_config.to_request_data(); + let actor = get_local_actor(uuid, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + let person = actor + .into_json(&data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + Ok(serde_json::to_string(&WithContext::new_default(person))?) + } + + pub fn router(&self) -> Router { + Router::new() + .route("/.well-known/nodeinfo", get(nodeinfo_well_known_handler)) + .route("/nodeinfo/2.0", get(nodeinfo_handler)) + .route("/.well-known/webfinger", get(webfinger_handler)) + .route("/inbox", post(inbox_handler)) + .route("/users/{id}/inbox", post(inbox_handler)) + .route("/users/{id}/outbox", get(outbox_handler)) + .route("/users/{id}/followers", get(followers_handler)) + .route("/users/{id}/following", get(following_handler)) + .layer(self.federation_config.middleware()) + } + + pub async fn follow(&self, local_user_id: uuid::Uuid, handle: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + + let normalized = handle.trim_start_matches('@'); + let parts: Vec<&str> = normalized.splitn(2, '@').collect(); + if parts.len() == 2 && parts[1] == data.domain { + return self.follow_local(local_user_id, parts[0], &data).await; + } + + let remote_actor: DbActor = webfinger_resolve_actor(handle, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + let follow_id_str = follow_id.to_string(); + let follow = FollowActivity { + id: follow_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: ObjectId::from(remote_actor.ap_id.clone()), + }; + let follow_with_ctx = WithContext::new_default(follow); + + let sends = SendActivityTask::prepare( + &follow_with_ctx, + &local_actor, + vec![remote_actor.inbox()], + &data, + ) + .await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!( + count = failures.len(), + "some activity deliveries failed permanently" + ); + } + + let remote = RemoteActor { + url: remote_actor.ap_id.to_string(), + handle: remote_actor.username.clone(), + inbox_url: remote_actor.inbox_url.to_string(), + shared_inbox_url: None, + display_name: Some(remote_actor.username.clone()), + avatar_url: None, + outbox_url: Some(remote_actor.outbox_url.to_string()), + }; + data.federation_repo + .add_following(local_user_id, remote, &follow_id_str) + .await?; + + Ok(()) + } + + pub async fn unfollow( + &self, + local_user_id: uuid::Uuid, + actor_url_str: &str, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + + if actor_url_str.starts_with(&self.base_url) { + return self + .unfollow_local(local_user_id, actor_url_str, &data) + .await; + } + + let remote = data + .federation_repo + .get_remote_actor(actor_url_str) + .await? + .ok_or_else(|| anyhow::anyhow!("remote actor not found: {}", actor_url_str))?; + + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let remote_ap_id = Url::parse(actor_url_str)?; + let inbox = Url::parse(&remote.inbox_url)?; + + let follow_activity_id_str = data + .federation_repo + .get_follow_activity_id(local_user_id, actor_url_str) + .await?; + let follow_id = match follow_activity_id_str { + Some(id) => Url::parse(&id)?, + None => activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + }; + let follow = FollowActivity { + id: follow_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: ObjectId::from(remote_ap_id), + }; + + let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + let undo = UndoActivity { + id: undo_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: serde_json::to_value(&follow).map_err(|e| anyhow::anyhow!("{e}"))?, + }; + + let sends = SendActivityTask::prepare( + &WithContext::new_default(undo), + &local_actor, + vec![inbox], + &data, + ) + .await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!( + count = failures.len(), + "some activity deliveries failed permanently" + ); + } + + data.federation_repo + .remove_following(local_user_id, actor_url_str) + .await?; + + data.object_handler + .on_actor_removed(&Url::parse(actor_url_str)?) + .await?; + + Ok(()) + } + + pub async fn accept_follower( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let remote_actor = data + .federation_repo + .get_remote_actor(remote_actor_url) + .await? + .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; + + let follow_id_str = data + .federation_repo + .get_follower_follow_activity_id(local_user_id, remote_actor_url) + .await? + .ok_or_else(|| { + anyhow::anyhow!("follow activity id not found for {}", remote_actor_url) + })?; + let follow_id = Url::parse(&follow_id_str)?; + let follow = FollowActivity { + id: follow_id, + kind: Default::default(), + actor: ObjectId::from(Url::parse(remote_actor_url)?), + object: ObjectId::from(local_actor.ap_id.clone()), + }; + let accept = AcceptActivity { + id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: follow, + }; + + data.federation_repo + .update_follower_status(local_user_id, remote_actor_url, FollowerStatus::Accepted) + .await?; + + let inbox = Url::parse(&remote_actor.inbox_url)?; + let sends = SendActivityTask::prepare( + &WithContext::new_default(accept), + &local_actor, + vec![inbox.clone()], + &data, + ) + .await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!( + "failed to deliver Accept activity, but follower is marked accepted locally" + ); + } + + let target_inbox = remote_actor + .shared_inbox_url + .clone() + .unwrap_or_else(|| remote_actor.inbox_url.clone()); + self.spawn_backfill(local_user_id, target_inbox); + + Ok(()) + } + + pub async fn reject_follower( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let remote_actor = data + .federation_repo + .get_remote_actor(remote_actor_url) + .await? + .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; + + let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + let follow = FollowActivity { + id: follow_id, + kind: Default::default(), + actor: ObjectId::from(Url::parse(remote_actor_url)?), + object: ObjectId::from(local_actor.ap_id.clone()), + }; + let reject = RejectActivity { + id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: follow, + }; + + let inbox = Url::parse(&remote_actor.inbox_url)?; + let sends = SendActivityTask::prepare( + &WithContext::new_default(reject), + &local_actor, + vec![inbox], + &data, + ) + .await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!( + count = failures.len(), + "some activity deliveries failed permanently" + ); + } + + data.federation_repo + .remove_follower(local_user_id, remote_actor_url) + .await?; + + Ok(()) + } + + pub async fn get_pending_followers( + &self, + local_user_id: uuid::Uuid, + ) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + data.federation_repo + .get_pending_followers(local_user_id) + .await + } + + pub async fn get_accepted_followers( + &self, + local_user_id: uuid::Uuid, + ) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + let followers = data.federation_repo.get_followers(local_user_id).await?; + Ok(followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .map(|f| f.actor) + .collect()) + } + + pub async fn count_accepted_followers( + &self, + local_user_id: uuid::Uuid, + ) -> anyhow::Result { + let data = self.federation_config.to_request_data(); + let followers = data.federation_repo.get_followers(local_user_id).await?; + Ok(followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .count()) + } + + pub async fn get_following( + &self, + local_user_id: uuid::Uuid, + ) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + data.federation_repo.get_following(local_user_id).await + } + + pub async fn count_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result { + let data = self.federation_config.to_request_data(); + data.federation_repo.count_following(local_user_id).await + } + + pub async fn remove_follower( + &self, + local_user_id: uuid::Uuid, + actor_url: &str, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + data.federation_repo + .remove_follower(local_user_id, actor_url) + .await + } + + /// Broadcast a single object to all accepted followers as a Create activity. + /// Called by project-specific event handlers when new content is created. + pub async fn broadcast_to_followers( + &self, + local_user_id: uuid::Uuid, + ap_id: Url, + object: serde_json::Value, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let followers = data.federation_repo.get_followers(local_user_id).await?; + let blocked = data + .federation_repo + .get_blocked_actors(local_user_id) + .await + .unwrap_or_default(); + let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); + let blocked_domains = data + .federation_repo + .get_blocked_domains() + .await + .unwrap_or_default(); + let blocked_domain_set: std::collections::HashSet = + blocked_domains.into_iter().map(|d| d.domain).collect(); + let accepted: Vec<_> = followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .filter(|f| !blocked_set.contains(&f.actor.url)) + .filter(|f| { + let domain = url::Url::parse(&f.actor.inbox_url) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + !blocked_domain_set.contains(&domain) + }) + .collect(); + + if accepted.is_empty() { + return Ok(()); + } + + let create = CreateActivity { + id: ap_id.clone(), + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object, + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![local_actor.followers_url.to_string()], + }; + let create_with_ctx = WithContext::new_default(create); + + let inboxes = collect_inboxes(&accepted); + + let sends = + SendActivityTask::prepare(&create_with_ctx, &local_actor, inboxes, &data).await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!( + count = failures.len(), + "some activity deliveries failed permanently" + ); + } + + Ok(()) + } + + /// Broadcast a Delete activity to all accepted followers for a removed review. + pub async fn broadcast_delete_to_followers( + &self, + local_user_id: uuid::Uuid, + ap_id: Url, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let followers = data.federation_repo.get_followers(local_user_id).await?; + let blocked = data + .federation_repo + .get_blocked_actors(local_user_id) + .await + .unwrap_or_default(); + let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); + let blocked_domains = data + .federation_repo + .get_blocked_domains() + .await + .unwrap_or_default(); + let blocked_domain_set: std::collections::HashSet = + blocked_domains.into_iter().map(|d| d.domain).collect(); + let accepted: Vec<_> = followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .filter(|f| !blocked_set.contains(&f.actor.url)) + .filter(|f| { + let domain = url::Url::parse(&f.actor.inbox_url) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + !blocked_domain_set.contains(&domain) + }) + .collect(); + + if accepted.is_empty() { + return Ok(()); + } + + let delete_id = + crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + let delete = crate::activities::DeleteActivity { + id: delete_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: ap_id, + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![local_actor.followers_url.to_string()], + }; + let delete_with_ctx = WithContext::new_default(delete); + let inboxes = collect_inboxes(&accepted); + let sends = + SendActivityTask::prepare(&delete_with_ctx, &local_actor, inboxes, &data).await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!( + count = failures.len(), + "some delete activity deliveries failed" + ); + } + Ok(()) + } + + /// Broadcast an Add(WatchlistObject) activity to all accepted followers. + pub async fn broadcast_add_to_followers( + &self, + local_user_id: uuid::Uuid, + ap_id: Url, + object: serde_json::Value, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let followers = data.federation_repo.get_followers(local_user_id).await?; + let blocked = data + .federation_repo + .get_blocked_actors(local_user_id) + .await + .unwrap_or_default(); + let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); + let blocked_domains = data + .federation_repo + .get_blocked_domains() + .await + .unwrap_or_default(); + let blocked_domain_set: std::collections::HashSet = + blocked_domains.into_iter().map(|d| d.domain).collect(); + let accepted: Vec<_> = followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .filter(|f| !blocked_set.contains(&f.actor.url)) + .filter(|f| { + let domain = url::Url::parse(&f.actor.inbox_url) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + !blocked_domain_set.contains(&domain) + }) + .collect(); + + if accepted.is_empty() { + return Ok(()); + } + + let add = crate::activities::AddActivity { + id: ap_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object, + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![local_actor.followers_url.to_string()], + }; + let add_with_ctx = WithContext::new_default(add); + let inboxes = collect_inboxes(&accepted); + let sends = SendActivityTask::prepare(&add_with_ctx, &local_actor, inboxes, &data).await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!(count = failures.len(), "some Add deliveries failed"); + } + Ok(()) + } + + /// Broadcast an Undo(Add) activity to all accepted followers. + pub async fn broadcast_undo_add_to_followers( + &self, + local_user_id: uuid::Uuid, + watchlist_entry_ap_id: Url, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let followers = data.federation_repo.get_followers(local_user_id).await?; + let blocked = data + .federation_repo + .get_blocked_actors(local_user_id) + .await + .unwrap_or_default(); + let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); + let blocked_domains = data + .federation_repo + .get_blocked_domains() + .await + .unwrap_or_default(); + let blocked_domain_set: std::collections::HashSet = + blocked_domains.into_iter().map(|d| d.domain).collect(); + let accepted: Vec<_> = followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .filter(|f| !blocked_set.contains(&f.actor.url)) + .filter(|f| { + let domain = url::Url::parse(&f.actor.inbox_url) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + !blocked_domain_set.contains(&domain) + }) + .collect(); + + if accepted.is_empty() { + return Ok(()); + } + + let undo_id = + crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + let undo = crate::activities::UndoActivity { + id: undo_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: serde_json::json!({ + "type": "Add", + "id": watchlist_entry_ap_id.as_str(), + "object": { "id": watchlist_entry_ap_id.as_str() } + }), + }; + let undo_with_ctx = WithContext::new_default(undo); + let inboxes = collect_inboxes(&accepted); + let sends = SendActivityTask::prepare(&undo_with_ctx, &local_actor, inboxes, &data).await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!(count = failures.len(), "some Undo(Add) deliveries failed"); + } + Ok(()) + } + + /// Broadcast an Update(Note) activity to all accepted followers for an edited review. + pub async fn broadcast_update_to_followers( + &self, + local_user_id: uuid::Uuid, + object: serde_json::Value, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let followers = data.federation_repo.get_followers(local_user_id).await?; + let blocked = data + .federation_repo + .get_blocked_actors(local_user_id) + .await + .unwrap_or_default(); + let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); + let blocked_domains = data + .federation_repo + .get_blocked_domains() + .await + .unwrap_or_default(); + let blocked_domain_set: std::collections::HashSet = + blocked_domains.into_iter().map(|d| d.domain).collect(); + let accepted: Vec<_> = followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .filter(|f| !blocked_set.contains(&f.actor.url)) + .filter(|f| { + let domain = url::Url::parse(&f.actor.inbox_url) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + !blocked_domain_set.contains(&domain) + }) + .collect(); + + if accepted.is_empty() { + return Ok(()); + } + + let update_id = Url::parse(&format!( + "{}/activities/update/{}", + self.base_url, + uuid::Uuid::new_v4() + ))?; + let update = crate::activities::UpdateActivity { + id: update_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object, + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![local_actor.followers_url.to_string()], + }; + let update_with_ctx = WithContext::new_default(update); + let inboxes = collect_inboxes(&accepted); + let sends = + SendActivityTask::prepare(&update_with_ctx, &local_actor, inboxes, &data).await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!( + count = failures.len(), + "some update activity deliveries failed" + ); + } + Ok(()) + } + + pub async fn broadcast_actor_update(&self, user_id: uuid::Uuid) -> anyhow::Result<()> { + use activitypub_federation::traits::Object; + + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let person = local_actor + .clone() + .into_json(&data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + // Wrap with @context so Mastodon's JSON-LD processor can resolve field names. + let person_json = serde_json::to_value(&WithContext::new_default(person))?; + + let update_id = Url::parse(&format!( + "{}/activities/update/{}", + self.base_url, + uuid::Uuid::new_v4() + ))?; + + let update = UpdateActivity { + id: update_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: person_json, + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![local_actor.followers_url.to_string()], + }; + + let followers = data.federation_repo.get_followers(user_id).await?; + let accepted: Vec<_> = followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .collect(); + + if accepted.is_empty() { + tracing::info!(user_id = %user_id, "no accepted followers, skipping actor update broadcast"); + return Ok(()); + } + + let inboxes = collect_inboxes(&accepted); + tracing::info!( + user_id = %user_id, + follower_count = accepted.len(), + inbox_count = inboxes.len(), + inboxes = ?inboxes, + "broadcasting actor update" + ); + + let sends = SendActivityTask::prepare( + &WithContext::new_default(update), + &local_actor, + inboxes, + &data, + ) + .await?; + + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + return Err(anyhow::anyhow!( + "actor update delivery failed for {} inbox(es): {}", + failures.len(), + failures + .iter() + .map(|e| e.to_string()) + .collect::>() + .join("; ") + )); + } + tracing::info!(user_id = %user_id, "actor update broadcast complete"); + Ok(()) + } + + pub async fn block_actor( + &self, + local_user_id: uuid::Uuid, + actor_url: &str, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + + data.federation_repo + .add_blocked_actor(local_user_id, actor_url) + .await?; + let _ = data + .federation_repo + .remove_follower(local_user_id, actor_url) + .await; + let _ = data + .federation_repo + .remove_following(local_user_id, actor_url) + .await; + + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + if let Ok(Some(remote_actor)) = data.federation_repo.get_remote_actor(actor_url).await { + let block_id = + crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + let block = crate::activities::BlockActivity { + id: block_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: Url::parse(actor_url)?, + }; + let inbox = Url::parse(&remote_actor.inbox_url)?; + let sends = SendActivityTask::prepare( + &WithContext::new_default(block), + &local_actor, + vec![inbox], + &data, + ) + .await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!(actor = %actor_url, "failed to deliver Block activity"); + } + } + + Ok(()) + } + + pub async fn unblock_actor( + &self, + local_user_id: uuid::Uuid, + actor_url: &str, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + data.federation_repo + .remove_blocked_actor(local_user_id, actor_url) + .await + } + + pub async fn get_blocked_actors( + &self, + local_user_id: uuid::Uuid, + ) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + let actor_urls = data + .federation_repo + .get_blocked_actors(local_user_id) + .await?; + let mut actors = Vec::new(); + for url in actor_urls { + let actor = match data.federation_repo.get_remote_actor(&url).await { + Ok(Some(a)) => a, + _ => RemoteActor { + url: url.clone(), + handle: url.clone(), + inbox_url: url.clone(), + shared_inbox_url: None, + display_name: None, + avatar_url: None, + outbox_url: None, + }, + }; + actors.push(actor); + } + Ok(actors) + } + + pub async fn add_blocked_domain( + &self, + domain: &str, + reason: Option<&str>, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + data.federation_repo + .add_blocked_domain(domain, reason) + .await + } + + pub async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + data.federation_repo.remove_blocked_domain(domain).await + } + + pub async fn get_blocked_domains(&self) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + data.federation_repo.get_blocked_domains().await + } + + async fn follow_local( + &self, + local_user_id: uuid::Uuid, + target_username: &str, + data: &activitypub_federation::config::Data, + ) -> anyhow::Result<()> { + let target = data + .user_repo + .find_by_username(target_username) + .await? + .ok_or_else(|| anyhow::anyhow!("user not found: {}", target_username))?; + + if target.id == local_user_id { + return Err(anyhow::anyhow!("cannot follow yourself")); + } + + let follower_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); + let target_actor_url = crate::urls::actor_url(&self.base_url, target.id); + let target_inbox_url = format!("{}/inbox", target_actor_url); + let follow_id = activity_url(&self.base_url) + .map_err(|e| anyhow::anyhow!("{e}"))? + .to_string(); + + data.federation_repo + .add_follower( + target.id, + &follower_actor_url, + FollowerStatus::Accepted, + &follow_id, + ) + .await?; + + let target_as_remote = RemoteActor { + url: target_actor_url.to_string(), + handle: format!("{}@{}", target.username, data.domain), + inbox_url: target_inbox_url, + shared_inbox_url: None, + display_name: Some(target.username), + avatar_url: None, + outbox_url: None, + }; + data.federation_repo + .add_following(local_user_id, target_as_remote, &follow_id) + .await?; + + data.federation_repo + .update_following_status( + local_user_id, + target_actor_url.as_ref(), + FollowingStatus::Accepted, + ) + .await?; + + tracing::info!(follower = %local_user_id, followee = %target.id, "local follow"); + Ok(()) + } + + async fn unfollow_local( + &self, + local_user_id: uuid::Uuid, + target_actor_url: &str, + data: &activitypub_federation::config::Data, + ) -> anyhow::Result<()> { + let target_url = Url::parse(target_actor_url)?; + let target_user_id = crate::urls::extract_user_id_from_url(&target_url) + .ok_or_else(|| anyhow::anyhow!("invalid local actor URL: {}", target_actor_url))?; + + let local_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); + + data.federation_repo + .remove_follower(target_user_id, &local_actor_url) + .await?; + data.federation_repo + .remove_following(local_user_id, target_actor_url) + .await?; + + tracing::info!(follower = %local_user_id, followee = %target_user_id, "local unfollow"); + Ok(()) + } + + pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> { + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build()?; + let data = self.federation_config.to_request_data(); + let actor = url::Url::parse(actor_url)?; + + let root: serde_json::Value = client + .get(outbox_url) + .header("Accept", "application/activity+json") + .send() + .await? + .json() + .await?; + + let first = match root.get("first").and_then(|v| v.as_str()) { + Some(url) => url.to_string(), + None => { + tracing::debug!(outbox = %outbox_url, "outbox has no first page, nothing to backfill"); + return Ok(()); + } + }; + + let mut current_url = first; + let mut visited = std::collections::HashSet::new(); + + loop { + if !visited.insert(current_url.clone()) { + tracing::warn!(url = %current_url, "backfill: loop detected, stopping"); + break; + } + + let page: serde_json::Value = match client + .get(¤t_url) + .header("Accept", "application/activity+json") + .send() + .await + { + Ok(resp) => match resp.json().await { + Ok(v) => v, + Err(e) => { + tracing::error!(error = %e, url = %current_url, "backfill: failed to parse page JSON"); + break; + } + }, + Err(e) => { + tracing::error!(error = %e, url = %current_url, "backfill: HTTP request failed"); + break; + } + }; + + if let Some(items) = page.get("orderedItems").and_then(|v| v.as_array()) { + for item in items { + let activity_type = item.get("type").and_then(|v| v.as_str()).unwrap_or(""); + if activity_type != "Create" && activity_type != "Add" { + continue; + } + let object = match item.get("object") { + Some(o) if o.is_object() => o.clone(), + _ => continue, + }; + let ap_id = match object + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| url::Url::parse(s).ok()) + { + Some(u) => u, + None => continue, + }; + if let Err(e) = data.object_handler.on_create(&ap_id, &actor, object).await { + tracing::warn!(ap_id = %ap_id, error = %e, "backfill: failed to process item, skipping"); + } + } + } + + match page.get("next").and_then(|v| v.as_str()) { + Some(next) => current_url = next.to_string(), + None => break, + } + } + + tracing::info!(outbox = %outbox_url, pages = visited.len(), "backfill complete"); + Ok(()) + } + + fn spawn_backfill(&self, owner_user_id: uuid::Uuid, follower_inbox_url: String) { + let config = self.federation_config.clone(); + let base_url = self.base_url.clone(); + tokio::spawn(async move { + if let Err(e) = ActivityPubService::run_backfill( + config, + base_url, + owner_user_id, + follower_inbox_url, + ) + .await + { + tracing::warn!(error = %e, "backfill: task failed"); + } + }); + } + + async fn run_backfill( + config: ApFederationConfig, + base_url: String, + owner_user_id: uuid::Uuid, + follower_inbox_url: String, + ) -> anyhow::Result<()> { + const BATCH_SIZE: usize = 20; + + let data = config.to_request_data(); + let local_actor = get_local_actor(owner_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + let inbox = Url::parse(&follower_inbox_url)?; + + let mut objects = data + .object_handler + .get_local_objects_for_user(owner_user_id) + .await?; + objects.reverse(); // oldest first → chronological feed + + let total = objects.len(); + let mut success_count = 0usize; + let mut failure_count = 0usize; + + for chunk in objects.chunks(BATCH_SIZE) { + for (ap_id, object_json) in chunk { + // Use a stable Create activity ID derived from the object's ap_id + let create_id = Url::parse(&format!( + "{}/activities/create/{}", + base_url, + uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, ap_id.as_str().as_bytes()) + ))?; + + let create = CreateActivity { + id: create_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: object_json.clone(), + to: vec![], + cc: vec![], + }; + + let sends = SendActivityTask::prepare( + &WithContext::new_default(create), + &local_actor, + vec![inbox.clone()], + &data, + ) + .await?; + let failures = send_with_retry(sends, &data).await; + if failures.is_empty() { + success_count += 1; + } else { + failure_count += 1; + } + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + tracing::info!( + user_id = %owner_user_id, + follower = %follower_inbox_url, + sent = success_count, + failed = failure_count, + total = total, + "backfill complete" + ); + Ok(()) + } +} + +#[cfg(test)] +#[path = "tests/service.rs"] +mod tests; diff --git a/crates/adapters/activitypub-base/src/tests/actors.rs b/crates/adapters/activitypub-base/src/tests/actors.rs new file mode 100644 index 0000000..7f510c4 --- /dev/null +++ b/crates/adapters/activitypub-base/src/tests/actors.rs @@ -0,0 +1,49 @@ +use super::*; + +#[test] +fn person_serializes_with_enriched_fields() { + let person = Person { + kind: Default::default(), + id: "https://example.com/users/1" + .parse::() + .unwrap() + .into(), + preferred_username: "alice".to_string(), + inbox: "https://example.com/users/1/inbox".parse().unwrap(), + outbox: "https://example.com/users/1/outbox".parse().unwrap(), + followers: "https://example.com/users/1/followers".parse().unwrap(), + following: "https://example.com/users/1/following".parse().unwrap(), + public_key: PublicKey { + id: "https://example.com/users/1#main-key".to_string(), + owner: "https://example.com/users/1".parse().unwrap(), + public_key_pem: "pem".to_string(), + }, + name: Some("Alice".to_string()), + summary: Some("Bio text".to_string()), + icon: Some(ApImageObject { + kind: "Image".to_string(), + url: "https://example.com/images/avatars/1".parse().unwrap(), + }), + url: Some("https://example.com/u/alice".parse().unwrap()), + discoverable: Some(true), + manually_approves_followers: true, + updated: Some(Utc::now()), + endpoints: Some(Endpoints { + shared_inbox: "https://example.com/inbox".parse().unwrap(), + }), + image: None, + also_known_as: vec![], + attachment: vec![], + }; + let json = serde_json::to_value(&person).unwrap(); + assert_eq!(json["discoverable"], true); + assert_eq!(json["summary"], "Bio text"); + assert_eq!(json["icon"]["type"], "Image"); + assert_eq!(json["manuallyApprovesFollowers"], true); + assert!(json.get("updated").is_some()); + assert!(json.get("endpoints").is_some()); + assert_eq!( + json["endpoints"]["sharedInbox"], + "https://example.com/inbox" + ); +} diff --git a/crates/adapters/activitypub-base/src/tests/nodeinfo.rs b/crates/adapters/activitypub-base/src/tests/nodeinfo.rs new file mode 100644 index 0000000..898e1bf --- /dev/null +++ b/crates/adapters/activitypub-base/src/tests/nodeinfo.rs @@ -0,0 +1,40 @@ +use super::*; + +#[test] +fn nodeinfo_well_known_serializes_correctly() { + let doc = NodeInfoWellKnown { + links: vec![NodeInfoLink { + rel: "http://nodeinfo.diaspora.software/ns/schema/2.0".to_string(), + href: "https://example.com/nodeinfo/2.0".to_string(), + }], + }; + let json = serde_json::to_value(&doc).unwrap(); + assert_eq!( + json["links"][0]["rel"], + "http://nodeinfo.diaspora.software/ns/schema/2.0" + ); + assert_eq!(json["links"][0]["href"], "https://example.com/nodeinfo/2.0"); +} + +#[test] +fn nodeinfo_serializes_camel_case() { + let doc = NodeInfo { + version: "2.0".to_string(), + software: NodeInfoSoftware { + name: "my-app".to_string(), + version: "0.1.0".to_string(), + }, + protocols: vec!["activitypub".to_string()], + usage: NodeInfoUsage { + users: NodeInfoUsers { total: 3 }, + local_posts: 42, + }, + open_registrations: false, + }; + let json = serde_json::to_value(&doc).unwrap(); + assert_eq!(json["version"], "2.0"); + assert_eq!(json["software"]["name"], "my-app"); + assert_eq!(json["usage"]["users"]["total"], 3); + assert_eq!(json["usage"]["localPosts"], 42); + assert_eq!(json["openRegistrations"], false); +} diff --git a/crates/adapters/activitypub-base/src/tests/service.rs b/crates/adapters/activitypub-base/src/tests/service.rs new file mode 100644 index 0000000..336f589 --- /dev/null +++ b/crates/adapters/activitypub-base/src/tests/service.rs @@ -0,0 +1,45 @@ +use super::*; +use crate::repository::{Follower, FollowerStatus, RemoteActor}; + +fn make_follower(inbox: &str, shared: Option<&str>) -> Follower { + Follower { + actor: RemoteActor { + url: format!("https://remote/{}", inbox), + handle: "user".to_string(), + inbox_url: inbox.to_string(), + shared_inbox_url: shared.map(|s| s.to_string()), + display_name: None, + avatar_url: None, + outbox_url: None, + }, + status: FollowerStatus::Accepted, + } +} + +#[test] +fn collect_inboxes_deduplicates_shared() { + let followers = vec![ + make_follower( + "https://mastodon.social/users/a/inbox", + Some("https://mastodon.social/inbox"), + ), + make_follower( + "https://mastodon.social/users/b/inbox", + Some("https://mastodon.social/inbox"), + ), + make_follower("https://other.instance/users/c/inbox", None), + ]; + let inboxes = collect_inboxes(&followers); + assert_eq!(inboxes.len(), 2); + let strs: Vec<_> = inboxes.iter().map(|u| u.as_str()).collect(); + assert!(strs.contains(&"https://mastodon.social/inbox")); + assert!(strs.contains(&"https://other.instance/users/c/inbox")); +} + +#[test] +fn collect_inboxes_falls_back_to_individual_inbox() { + let followers = vec![make_follower("https://example.com/users/x/inbox", None)]; + let inboxes = collect_inboxes(&followers); + assert_eq!(inboxes.len(), 1); + assert_eq!(inboxes[0].as_str(), "https://example.com/users/x/inbox"); +} diff --git a/crates/adapters/activitypub-base/src/urls.rs b/crates/adapters/activitypub-base/src/urls.rs new file mode 100644 index 0000000..2f8f003 --- /dev/null +++ b/crates/adapters/activitypub-base/src/urls.rs @@ -0,0 +1,30 @@ +use url::Url; + +use crate::error::Error; + +pub const AS_PUBLIC: &str = "https://www.w3.org/ns/activitystreams#Public"; + +pub fn extract_user_id_from_url(url: &Url) -> Option { + let path = url.path(); + path.strip_prefix("/users/") + .and_then(|s| s.split('/').next()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()) +} + +pub fn activity_url(base_url: &str) -> Result { + Url::parse(&format!("{}/activities/{}", base_url, uuid::Uuid::new_v4())) + .map_err(|e| Error::bad_request(anyhow::anyhow!(e))) +} + +pub fn actor_url(base_url: &str, user_id: uuid::Uuid) -> Url { + Url::parse(&format!("{}/users/{}", base_url, user_id)) + .expect("base_url is always a valid URL prefix") +} + +/// Extract the username segment from a /users/:username URL. +pub fn extract_username_from_url(url: &Url) -> Option { + url.path() + .strip_prefix("/users/") + .and_then(|s| s.split('/').next()) + .map(|s| s.to_string()) +} diff --git a/crates/adapters/activitypub-base/src/user.rs b/crates/adapters/activitypub-base/src/user.rs new file mode 100644 index 0000000..a99092b --- /dev/null +++ b/crates/adapters/activitypub-base/src/user.rs @@ -0,0 +1,27 @@ +use async_trait::async_trait; +use url::Url; + +#[derive(Debug, Clone)] +pub struct ApProfileField { + pub name: String, + pub value: String, +} + +#[derive(Debug, Clone)] +pub struct ApUser { + pub id: uuid::Uuid, + pub username: String, + pub bio: Option, + pub avatar_url: Option, + pub banner_url: Option, + pub also_known_as: Option, + pub profile_url: Option, + pub attachment: Vec, +} + +#[async_trait] +pub trait ApUserRepository: Send + Sync { + async fn find_by_id(&self, id: uuid::Uuid) -> anyhow::Result>; + async fn find_by_username(&self, username: &str) -> anyhow::Result>; + async fn count_users(&self) -> anyhow::Result; +} diff --git a/crates/adapters/activitypub-base/src/webfinger.rs b/crates/adapters/activitypub-base/src/webfinger.rs new file mode 100644 index 0000000..8754287 --- /dev/null +++ b/crates/adapters/activitypub-base/src/webfinger.rs @@ -0,0 +1,38 @@ +use activitypub_federation::{ + config::Data, + fetch::webfinger::{Webfinger, build_webfinger_response, extract_webfinger_name}, +}; +use axum::{ + extract::Query, + http::header, + response::{IntoResponse, Response}, +}; +use serde::Deserialize; + +use crate::data::FederationData; +use crate::error::Error; + +#[derive(Deserialize)] +pub struct WebfingerQuery { + resource: String, +} + +pub async fn webfinger_handler( + Query(query): Query, + data: Data, +) -> Result { + let name = extract_webfinger_name(&query.resource, &data)?; + + let user = data + .user_repo + .find_by_username(name) + .await + .map_err(Error::from)? + .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; + + let ap_id = crate::urls::actor_url(&data.base_url, user.id); + + let wf: Webfinger = build_webfinger_response(query.resource, ap_id); + let body = serde_json::to_string(&wf).map_err(|e| Error::from(anyhow::anyhow!(e)))?; + Ok(([(header::CONTENT_TYPE, "application/jrd+json")], body).into_response()) +}