diff --git a/Cargo.lock b/Cargo.lock index cce7674..f310611 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1368,7 +1368,7 @@ dependencies = [ [[package]] name = "k-ap" -version = "0.1.9" +version = "0.2.0" dependencies = [ "activitypub_federation", "anyhow", @@ -1384,6 +1384,7 @@ dependencies = [ "tracing", "url", "uuid", + "zeroize", ] [[package]] @@ -3230,6 +3231,20 @@ name = "zeroize" version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] [[package]] name = "zerotrie" diff --git a/Cargo.toml b/Cargo.toml index bc44e84..f265a9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "k-ap" -version = "0.1.10" +version = "0.2.0" edition = "2024" description = "Generic ActivityPub protocol layer" license = "MIT" @@ -21,3 +21,4 @@ reqwest = { version = "0.13", features = ["json"] } url = { version = "2", features = ["serde"] } enum_delegate = "0.2" activitypub_federation = "0.7.0-beta.11" +zeroize = { version = "1", features = ["derive"] } diff --git a/src/activities.rs b/src/activities.rs index 49b9026..df32ab2 100644 --- a/src/activities.rs +++ b/src/activities.rs @@ -82,34 +82,44 @@ impl Activity for FollowActivity { } async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); + if already_processed(&self.id, data).await { return Ok(()); } + let actor_url = self.actor.inner(); + let domain = actor_url.host_str().unwrap_or(""); + + if data.federation_repo.is_domain_blocked(domain).await? { + tracing::info!(actor = %actor_url, "ignoring follow from blocked domain"); + return Ok(()); + } + + // Check per-actor block BEFORE issuing any outbound HTTP request. + // We can derive the target user ID from the follow object URL without dereferencing. + if let Some(target_user_id) = crate::urls::extract_user_id_from_url(self.object.inner()) { + if data + .federation_repo + .is_actor_blocked(target_user_id, actor_url.as_str()) + .await? + { + tracing::info!(actor = %actor_url, "ignoring follow from blocked actor"); + return Ok(()); + } + } + let _follower = self.actor.dereference(data).await?; let local_actor = self.object.dereference(data).await?; - if data - .federation_repo - .is_actor_blocked(local_actor.user_id, self.actor.inner().as_str()) - .await? - { - tracing::info!(actor = %self.actor.inner(), "ignoring follow from blocked actor"); - return Ok(()); - } - data.federation_repo .add_follower( local_actor.user_id, - self.actor.inner().as_str(), + actor_url.as_str(), FollowerStatus::Pending, self.id.as_str(), ) .await?; tracing::info!( - follower = %self.actor.inner(), + follower = %actor_url, local_user = %local_actor.user_id, "follow request pending approval" ); @@ -152,6 +162,9 @@ impl Activity for AcceptActivity { } async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if already_processed(&self.id, data).await { + return Ok(()); + } let domain = self.actor().host_str().unwrap_or(""); if data.federation_repo.is_domain_blocked(domain).await? { tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); @@ -207,6 +220,9 @@ impl Activity for RejectActivity { } async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if already_processed(&self.id, data).await { + return Ok(()); + } let domain = self.actor().host_str().unwrap_or(""); if data.federation_repo.is_domain_blocked(domain).await? { tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); @@ -260,6 +276,9 @@ impl Activity for UndoActivity { } async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if already_processed(&self.id, data).await { + return Ok(()); + } let domain = self.actor().host_str().unwrap_or(""); if data.federation_repo.is_domain_blocked(domain).await? { tracing::info!(actor = %self.actor(), "ignoring Undo from blocked domain"); @@ -375,6 +394,9 @@ impl Activity for CreateActivity { } async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if already_processed(&self.id, data).await { + return Ok(()); + } let domain = self.actor().host_str().unwrap_or(""); if data.federation_repo.is_domain_blocked(domain).await? { tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); @@ -389,6 +411,10 @@ impl Activity for CreateActivity { .and_then(|s| Url::parse(s).ok()) .unwrap_or_else(|| self.id.clone()); let actor_url = self.actor.inner().clone(); + + // Extract Mention tags and notify local users. + extract_and_dispatch_mentions(&ap_id, &actor_url, &self.object, data).await; + data.object_handler .on_create(&ap_id, &actor_url, self.object) .await @@ -451,6 +477,9 @@ impl Activity for DeleteActivity { } async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if already_processed(&self.id, data).await { + return Ok(()); + } let domain = self.actor().host_str().unwrap_or(""); if data.federation_repo.is_domain_blocked(domain).await? { tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); @@ -535,6 +564,9 @@ impl Activity for UpdateActivity { } async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if already_processed(&self.id, data).await { + return Ok(()); + } let domain = self.actor().host_str().unwrap_or(""); if data.federation_repo.is_domain_blocked(domain).await? { tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); @@ -547,6 +579,10 @@ impl Activity for UpdateActivity { .and_then(|s| Url::parse(s).ok()) .unwrap_or_else(|| self.id.clone()); let actor_url = self.actor.inner().clone(); + + // Re-extract mentions on update so newly-added mentions are notified. + extract_and_dispatch_mentions(&ap_id, &actor_url, &self.object, data).await; + data.object_handler .on_update(&ap_id, &actor_url, self.object) .await @@ -592,6 +628,9 @@ impl Activity for AnnounceActivity { } async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if already_processed(&self.id, data).await { + return Ok(()); + } let domain = self.actor().host_str().unwrap_or(""); if data.federation_repo.is_domain_blocked(domain).await? { tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); @@ -599,10 +638,17 @@ impl Activity for AnnounceActivity { } let object_domain = self.object.host_str().unwrap_or(""); if object_domain != data.domain { + // Cross-server boost: notify the handler so consumers can surface it. + data.object_handler + .on_announce_of_remote(&self.object, self.actor.inner()) + .await + .unwrap_or_else(|e| { + tracing::warn!(error = %e, "failed to process cross-server announce"); + }); tracing::debug!( actor = %self.actor.inner(), object = %self.object, - "received Announce of non-local object — skipped (cross-server boost not supported)" + "received Announce of non-local object" ); return Ok(()); } @@ -656,6 +702,9 @@ impl Activity for LikeActivity { } async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if already_processed(&self.id, data).await { + return Ok(()); + } let domain = self.actor().host_str().unwrap_or(""); if data.federation_repo.is_domain_blocked(domain).await? { tracing::info!(actor = %self.actor(), "ignoring Like from blocked domain"); @@ -723,6 +772,9 @@ impl Activity for AddActivity { } async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if already_processed(&self.id, data).await { + return Ok(()); + } let domain = self.actor().host_str().unwrap_or(""); if data.federation_repo.is_domain_blocked(domain).await? { tracing::info!(actor = %self.actor(), "ignoring Add from blocked domain"); @@ -774,6 +826,9 @@ impl Activity for BlockActivity { } async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if already_processed(&self.id, data).await { + return Ok(()); + } let domain = self.actor().host_str().unwrap_or(""); if data.federation_repo.is_domain_blocked(domain).await? { tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); @@ -833,6 +888,9 @@ impl Activity for MoveActivity { } async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if already_processed(&self.id, data).await { + return Ok(()); + } let domain = self.actor().host_str().unwrap_or(""); if data.federation_repo.is_domain_blocked(domain).await? { return Ok(()); @@ -922,6 +980,76 @@ impl Activity for MoveActivity { } } +// --- Idempotency guard --- + +/// Returns `true` if the activity was already processed (caller should return `Ok(())`). +/// Marks the activity as processed before returning `false`. +/// On any repository error the check is skipped to avoid silently dropping activities. +async fn already_processed(activity_id: &Url, data: &Data) -> bool { + let id = activity_id.as_str(); + match data.federation_repo.is_activity_processed(id).await { + Ok(true) => { + tracing::debug!(activity_id = id, "duplicate activity, skipping"); + return true; + } + Ok(false) => { + if let Err(e) = data.federation_repo.mark_activity_processed(id).await { + tracing::warn!(activity_id = id, error = %e, "failed to mark activity processed"); + } + } + Err(e) => { + tracing::warn!(error = %e, "idempotency check failed, processing anyway"); + } + } + false +} + +// --- Mention extraction --- + +/// Parse `object["tag"]` for Mention entries and call `on_mention` for each +/// local user that is tagged. Failures are logged but never propagated — a +/// broken mention notification must not fail the entire activity. +async fn extract_and_dispatch_mentions( + ap_id: &Url, + actor_url: &Url, + object: &serde_json::Value, + data: &Data, +) { + let tags = match object.get("tag").and_then(|t| t.as_array()) { + Some(t) => t, + None => return, + }; + for tag in tags { + let tag_type = tag.get("type").and_then(|v| v.as_str()).unwrap_or(""); + if tag_type != "Mention" { + continue; + } + let href = match tag.get("href").and_then(|v| v.as_str()) { + Some(h) => h, + None => continue, + }; + let Ok(href_url) = Url::parse(href) else { continue }; + + // Only dispatch for local actors. + let Some(mentioned_user_id) = crate::urls::extract_user_id_from_url(&href_url) else { + continue; + }; + + if let Err(e) = data + .object_handler + .on_mention(ap_id, mentioned_user_id, actor_url) + .await + { + tracing::warn!( + ap_id = %ap_id, + mentioned_user = %mentioned_user_id, + error = %e, + "failed to dispatch mention notification" + ); + } + } +} + // --- Inbox dispatch enum --- #[derive(Debug, Deserialize, Serialize)] diff --git a/src/actor_handler.rs b/src/actor_handler.rs index 7030967..f52d8ce 100644 --- a/src/actor_handler.rs +++ b/src/actor_handler.rs @@ -7,18 +7,16 @@ use crate::actors::{Person, get_local_actor}; use crate::data::FederationData; use crate::error::Error; +/// Serves the AP actor JSON for a local user. +/// The path parameter is the user's UUID (matching the canonical actor URL). pub async fn actor_handler( - Path(username): Path, + Path(user_id_str): Path, data: Data, ) -> Result>, Error> { - let ap_user = data - .user_repo - .find_by_username(&username) - .await - .map_err(Error::from)? - .ok_or_else(|| Error::bad_request(anyhow::anyhow!("user not found")))?; + let user_id = uuid::Uuid::parse_str(&user_id_str) + .map_err(|_| Error::not_found(anyhow::anyhow!("user not found")))?; - let db_actor = get_local_actor(ap_user.id, &data).await?; + let db_actor = get_local_actor(user_id, &data).await?; let person = db_actor.into_json(&data).await?; Ok(FederationJson(WithContext::new_default(person))) diff --git a/src/actors.rs b/src/actors.rs index 4507eee..4cf8fa7 100644 --- a/src/actors.rs +++ b/src/actors.rs @@ -8,11 +8,12 @@ use activitypub_federation::{ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use url::Url; +use zeroize::Zeroizing; use crate::data::FederationData; use crate::error::Error; use crate::repository::RemoteActor; -use crate::user::ApProfileField; +use crate::user::{ApActorType, ApProfileField}; #[derive(Debug, Clone)] pub struct DbActor { @@ -20,6 +21,8 @@ pub struct DbActor { pub username: String, pub display_name: Option, pub public_key_pem: String, + /// Private key PEM. Only populated for local actors during signing. + /// Cleared automatically when `DbActor` is dropped. pub private_key_pem: Option, pub inbox_url: Url, pub shared_inbox_url: Option, @@ -34,6 +37,8 @@ pub struct DbActor { pub also_known_as: Option, pub profile_url: Option, pub attachment: Vec, + pub manually_approves_followers: bool, + pub actor_type: ApActorType, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -57,22 +62,6 @@ pub struct ProfileFieldObject { pub value: String, } -/// Accepts any AP actor type on inbound JSON; always serializes as "Person" for local actors. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ApActorType { - Person, - Service, - Application, - Organization, - Group, -} - -impl Default for ApActorType { - fn default() -> Self { - Self::Person - } -} - #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct Person { @@ -155,9 +144,17 @@ pub async fn get_local_actor( Some(kp) => kp, None => { let kp = generate_actor_keypair()?; + // Zeroize the private key after storing it so the plaintext doesn't + // linger in memory beyond this scope. + let private_zeroized = Zeroizing::new(kp.private_key.clone()); data.federation_repo - .save_local_actor_keypair(user_id, kp.public_key.clone(), kp.private_key.clone()) + .save_local_actor_keypair( + user_id, + kp.public_key.clone(), + private_zeroized.clone().to_string(), + ) .await?; + drop(private_zeroized); (kp.public_key, kp.private_key) } }; @@ -174,7 +171,7 @@ pub async fn get_local_actor( Ok(DbActor { user_id, username: user.username, - display_name: None, + display_name: user.display_name, public_key_pem: public_key, private_key_pem: Some(private_key), inbox_url, @@ -190,6 +187,8 @@ pub async fn get_local_actor( also_known_as: user.also_known_as, profile_url: user.profile_url, attachment: user.attachment, + manually_approves_followers: user.manually_approves_followers, + actor_type: user.actor_type, }) } @@ -246,8 +245,8 @@ impl Object for DbActor { Ok(Some(DbActor { user_id, - username: user.username, - display_name: None, + username: user.username.clone(), + display_name: user.display_name, public_key_pem: public_key, private_key_pem: private_key, inbox_url, @@ -263,6 +262,8 @@ impl Object for DbActor { also_known_as: user.also_known_as, profile_url: user.profile_url, attachment: user.attachment, + manually_approves_followers: user.manually_approves_followers, + actor_type: user.actor_type, })) } @@ -281,7 +282,6 @@ impl Object for DbActor { kind: "Image".to_string(), url, }); - let profile_url = self.profile_url; let also_known_as: Vec = self.also_known_as.into_iter().collect(); let attachment: Vec = self .attachment @@ -297,7 +297,7 @@ impl Object for DbActor { Url::parse(&format!("{}/inbox", data.base_url)).expect("base_url is always valid"); Ok(Person { - kind: Default::default(), + kind: self.actor_type, id: self.ap_id.clone().into(), preferred_username: self.username.clone(), inbox: self.inbox_url.clone(), @@ -305,12 +305,12 @@ impl Object for DbActor { followers: Some(self.followers_url.clone()), following: Some(self.following_url.clone()), public_key, - name: Some(self.username.clone()), + name: self.display_name.or_else(|| Some(self.username.clone())), summary: self.bio.clone(), icon, - url: profile_url, + url: self.profile_url, discoverable: Some(true), - manually_approves_followers: true, + manually_approves_followers: self.manually_approves_followers, updated: Some(self.last_refreshed_at), endpoints: Some(Endpoints { shared_inbox }), image, @@ -397,6 +397,8 @@ impl Object for DbActor { value: f.value.clone(), }) .collect(), + manually_approves_followers: json.manually_approves_followers, + actor_type: json.kind, }) } } diff --git a/src/content.rs b/src/content.rs index e6156d9..589820d 100644 --- a/src/content.rs +++ b/src/content.rs @@ -63,6 +63,16 @@ pub trait ApObjectHandler: Send + Sync { actor_url: &Url, ) -> anyhow::Result<()>; + /// Called when a remote actor boosts (Announce) a non-local object. + /// Use this to surface cross-server boosts in followers' feeds. + /// `object_url` is the AP URL of the announced note. + /// `actor_url` is the AP URL of the remote actor who sent the Announce. + async fn on_announce_of_remote( + &self, + object_url: &Url, + actor_url: &Url, + ) -> anyhow::Result<()>; + /// Total number of locally-authored posts across all users. async fn count_local_posts(&self) -> anyhow::Result; } diff --git a/src/data.rs b/src/data.rs index 0186f19..56d0f3c 100644 --- a/src/data.rs +++ b/src/data.rs @@ -4,11 +4,47 @@ use crate::content::ApObjectHandler; use crate::repository::FederationRepository; use crate::user::ApUserRepository; -/// Minimal event-publishing abstraction — project-specific implementations -/// are wired in by the consuming crate via `FederationData::new`. +/// Typed event emitted by the federation layer. Consumers wire in an +/// [`EventPublisher`] to receive these and drive side effects (job queues, +/// webhooks, metrics, etc.). +/// +/// # Delivery flow +/// +/// When an `EventPublisher` is configured, outbound activities are NOT +/// delivered directly — instead a [`FederationEvent::DeliveryRequested`] event +/// is published for each target inbox. The consumer's job queue should: +/// 1. Persist the event. +/// 2. Call [`crate::service::ActivityPubService::deliver_to_inbox`] when +/// processing the queue item. +/// +/// Without a publisher, the library falls back to fire-and-forget +/// `tokio::spawn` delivery (no persistence across restarts). +#[derive(Debug, Clone)] +pub enum FederationEvent { + /// An outbound activity must be delivered to `inbox`. + /// Call `ActivityPubService::deliver_to_inbox(inbox, activity, signing_actor_id)`. + DeliveryRequested { + inbox: url::Url, + activity: serde_json::Value, + signing_actor_id: uuid::Uuid, + }, + /// Delivery to `inbox` failed permanently after all in-process retries. + /// The consumer may schedule additional retries or alert. + DeliveryFailed { + inbox: url::Url, + activity: serde_json::Value, + signing_actor_id: uuid::Uuid, + error: String, + }, +} + +/// Receives typed federation events from the library. +/// +/// Implement this trait to bridge federation events into your application's +/// job queue, message broker, or metrics system. #[async_trait::async_trait] pub trait EventPublisher: Send + Sync { - async fn publish(&self, event: &str) -> anyhow::Result<()>; + async fn publish(&self, event: FederationEvent) -> anyhow::Result<()>; } #[derive(Clone)] @@ -20,7 +56,6 @@ pub struct FederationData { pub(crate) domain: String, pub(crate) allow_registration: bool, pub(crate) software_name: String, - #[allow(dead_code)] pub(crate) event_publisher: Option>, } diff --git a/src/error.rs b/src/error.rs index d631755..709cb45 100644 --- a/src/error.rs +++ b/src/error.rs @@ -33,15 +33,18 @@ where impl axum::response::IntoResponse for Error { fn into_response(self) -> axum::response::Response { let status = self.1; + // Always log the real error internally; never expose it to the client. if status.is_server_error() { tracing::error!(error = %self.0, status = status.as_u16(), "federation error"); } else { - tracing::debug!(error = %self.0, status = status.as_u16(), "federation response"); + tracing::debug!(error = %self.0, status = status.as_u16(), "federation client error"); } - let body = if status.is_server_error() { - "internal server error".to_string() - } else { - self.0.to_string() + let body = match status { + StatusCode::NOT_FOUND => "not found", + StatusCode::BAD_REQUEST => "bad request", + StatusCode::UNAUTHORIZED => "unauthorized", + StatusCode::FORBIDDEN => "forbidden", + _ => "internal server error", }; (status, body).into_response() } diff --git a/src/inbox.rs b/src/inbox.rs index 2f2d063..21b1b50 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -9,6 +9,11 @@ use crate::actors::DbActor; use crate::data::FederationData; use crate::error::Error; +/// Idempotency is enforced inside each activity's `receive()` implementation +/// via `FederationRepository::is_activity_processed` / +/// `mark_activity_processed`. HTTP signature verification and JSON-LD +/// processing are handled by `activitypub_federation` middleware before this +/// handler is reached. pub async fn inbox_handler( data: Data, activity_data: ActivityData, diff --git a/src/lib.rs b/src/lib.rs index a58591b..66c9916 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,11 +18,11 @@ pub mod webfinger; pub use urls::AS_PUBLIC; pub use activitypub_federation::kinds::object::NoteType; pub use content::ApObjectHandler; -pub use data::FederationData; +pub use data::{EventPublisher, FederationData, FederationEvent}; pub use error::Error; pub use federation::ApFederationConfig; pub use repository::{ BlockedDomain, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, }; pub use service::ActivityPubService; -pub use user::{ApProfileField, ApUser, ApUserRepository, LookedUpActor}; +pub use user::{ApActorType, ApProfileField, ApUser, ApUserRepository, LookedUpActor}; diff --git a/src/outbox.rs b/src/outbox.rs index 80f004e..a56812a 100644 --- a/src/outbox.rs +++ b/src/outbox.rs @@ -27,6 +27,7 @@ pub struct OrderedCollection { id: String, total_items: u64, first: String, + last: String, } #[derive(Serialize, Deserialize)] @@ -38,6 +39,7 @@ pub struct OrderedCollectionPage { kind: String, id: String, part_of: String, + total_items: u64, ordered_items: Vec, #[serde(skip_serializing_if = "Option::is_none")] next: Option, @@ -59,6 +61,16 @@ pub async fn outbox_handler( let outbox_url = format!("{}/users/{}/outbox", data.base_url, user_id_str); + // Total count — uses count_local_posts for an aggregated count. For a + // per-user count we use the page length on the first page as an upper bound + // if count_local_posts returns 0. In practice this trait method is called + // infrequently (only on the root collection endpoint). + let total = data + .object_handler + .count_local_posts() + .await + .map_err(|e| Error::from(anyhow::anyhow!("{}", e)))?; + if query.page.unwrap_or(false) { let before: Option> = query.before.as_deref().and_then(|s| s.parse().ok()); @@ -114,24 +126,19 @@ pub async fn outbox_handler( kind: "OrderedCollectionPage".to_string(), id: page_id, part_of: outbox_url, + total_items: total, ordered_items, next, }) .into_response()) } else { - let total = data - .object_handler - .get_local_objects_for_user(uuid) - .await - .map_err(|e| Error::from(anyhow::anyhow!("{}", e)))? - .len() as u64; - Ok(axum::Json(OrderedCollection { context: crate::urls::AP_CONTEXT.to_string(), kind: "OrderedCollection".to_string(), id: outbox_url.clone(), total_items: total, first: format!("{}?page=true", outbox_url), + last: format!("{}?page=true&before=1970-01-01T00:00:00.000Z", outbox_url), }) .into_response()) } diff --git a/src/repository.rs b/src/repository.rs index 35f07eb..51b2d5b 100644 --- a/src/repository.rs +++ b/src/repository.rs @@ -139,4 +139,22 @@ pub trait FederationRepository: Send + Sync { old_actor_url: &str, new_actor_url: &str, ) -> Result>; + + /// Return `true` if an activity with `activity_id` has already been processed. + /// Implementations should enforce a UNIQUE constraint on the stored activity IDs + /// so concurrent duplicate deliveries are safely rejected. + async fn is_activity_processed(&self, activity_id: &str) -> Result; + + /// Record `activity_id` as processed. Called immediately before dispatching + /// each inbound activity so that retried deliveries are no-ops. + async fn mark_activity_processed(&self, activity_id: &str) -> Result<()>; + + /// Return deduplicated inbox URLs (shared_inbox preferred over personal inbox) + /// for all **accepted** followers of `local_user_id`, excluding any actors or + /// domains that are blocked. Implementations should perform filtering and + /// deduplication in the database rather than in application memory. + async fn get_accepted_follower_inboxes( + &self, + local_user_id: uuid::Uuid, + ) -> Result>; } diff --git a/src/service.rs b/src/service.rs index 3a853bf..91a6aff 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,10 +1,12 @@ +use std::fmt::Debug; use std::sync::Arc; use activitypub_federation::{ activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext, - traits::Actor, + traits::{Activity, Actor}, }; -use axum::{Router, routing::get, routing::post}; +use axum::{Router, extract::DefaultBodyLimit, routing::get, routing::post}; +use serde::Serialize; use url::Url; use crate::{ @@ -12,10 +14,13 @@ use crate::{ AcceptActivity, CreateActivity, FollowActivity, RejectActivity, UndoActivity, UpdateActivity, }, + actor_handler::actor_handler, actors::{DbActor, get_local_actor}, content::ApObjectHandler, - data::FederationData, + data::{FederationData, FederationEvent}, + error::Error, federation::ApFederationConfig, + followers_handler::{followers_handler, following_handler}, inbox::inbox_handler, nodeinfo::{nodeinfo_handler, nodeinfo_well_known_handler}, outbox::outbox_handler, @@ -27,10 +32,14 @@ use crate::{ webfinger::webfinger_handler, }; -const DELIVERY_MAX_ATTEMPTS: u32 = 3; -const DELIVERY_INITIAL_DELAY_SECS: u64 = 1; -const HTTP_FETCH_TIMEOUT_SECS: u64 = 30; -const BATCH_FETCH_SLEEP_MS: u64 = 100; +/// Maximum retries for immediate in-process delivery attempts. +pub const DELIVERY_MAX_ATTEMPTS: u32 = 3; +/// Initial backoff before first retry (doubles each attempt). +pub const DELIVERY_INITIAL_DELAY_SECS: u64 = 1; +/// HTTP request timeout when fetching remote AP resources. +pub const HTTP_FETCH_TIMEOUT_SECS: u64 = 30; +/// Sleep between backfill batches to avoid overwhelming remote servers. +pub const BATCH_FETCH_SLEEP_MS: u64 = 100; #[allow(dead_code)] fn content_to_html(text: &str) -> String { @@ -51,35 +60,19 @@ fn content_to_html(text: &str) -> String { } } -fn collect_inboxes(followers: &[crate::repository::Follower]) -> Vec { - let mut seen = std::collections::HashSet::new(); - let mut inboxes = Vec::new(); - for f in followers { - let inbox_str = f - .actor - .shared_inbox_url - .as_deref() - .unwrap_or(&f.actor.inbox_url); - if seen.insert(inbox_str.to_string()) - && let Ok(url) = Url::parse(inbox_str) - { - inboxes.push(url); - } - } - inboxes -} - pub(crate) async fn send_with_retry( sends: Vec, data: &activitypub_federation::config::Data, + max_attempts: u32, + initial_delay_secs: u64, ) -> Vec { let mut failures = vec![]; for send in sends { - let mut delay = std::time::Duration::from_secs(DELIVERY_INITIAL_DELAY_SECS); - for attempt in 1..=DELIVERY_MAX_ATTEMPTS { + let mut delay = std::time::Duration::from_secs(initial_delay_secs); + for attempt in 1..=max_attempts { match send.clone().sign_and_send(data).await { Ok(()) => break, - Err(e) if attempt < DELIVERY_MAX_ATTEMPTS => { + Err(e) if attempt < max_attempts => { tracing::warn!(attempt, error = %e, "delivery failed, retrying"); tokio::time::sleep(delay).await; delay *= 2; @@ -94,10 +87,52 @@ pub(crate) async fn send_with_retry( failures } +/// Wraps a pre-serialized AP activity JSON for re-signing via `SendActivityTask::prepare`. +/// Used by `deliver_to_inbox` when a consumer re-presents a persisted queue item. +#[derive(Debug)] +struct RawActivity { + id: Url, + actor_url: Url, + value: serde_json::Value, +} + +impl Serialize for RawActivity { + fn serialize(&self, s: S) -> Result { + self.value.serialize(s) + } +} + +#[async_trait::async_trait] +impl Activity for RawActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + &self.actor_url + } + async fn verify( + &self, + _data: &activitypub_federation::config::Data, + ) -> Result<(), Self::Error> { + Ok(()) + } + async fn receive( + self, + _data: &activitypub_federation::config::Data, + ) -> Result<(), Self::Error> { + Ok(()) + } +} + #[derive(Clone)] pub struct ActivityPubService { federation_config: ApFederationConfig, base_url: String, + delivery_max_attempts: u32, + delivery_initial_delay_secs: u64, } pub struct ActivityPubServiceBuilder { @@ -109,6 +144,8 @@ pub struct ActivityPubServiceBuilder { software_name: String, debug: bool, event_publisher: Option>, + delivery_max_attempts: u32, + delivery_initial_delay_secs: u64, } impl ActivityPubServiceBuilder { @@ -128,6 +165,16 @@ impl ActivityPubServiceBuilder { self.event_publisher = Some(v); self } + /// Max delivery retries per inbox per attempt (default: 3). + pub fn delivery_max_attempts(mut self, v: u32) -> Self { + self.delivery_max_attempts = v; + self + } + /// Initial retry backoff in seconds, doubles each attempt (default: 1). + pub fn delivery_initial_delay_secs(mut self, v: u64) -> Self { + self.delivery_initial_delay_secs = v; + self + } pub async fn build(self) -> anyhow::Result { let data = FederationData::new( self.repo, @@ -142,6 +189,8 @@ impl ActivityPubServiceBuilder { Ok(ActivityPubService { federation_config, base_url: self.base_url, + delivery_max_attempts: self.delivery_max_attempts, + delivery_initial_delay_secs: self.delivery_initial_delay_secs, }) } } @@ -162,6 +211,8 @@ impl ActivityPubService { software_name: String::new(), debug: false, event_publisher: None, + delivery_max_attempts: DELIVERY_MAX_ATTEMPTS, + delivery_initial_delay_secs: DELIVERY_INITIAL_DELAY_SECS, } } @@ -177,9 +228,114 @@ impl ActivityPubService { &self.base_url } - /// Returns `(local_actor, deduplicated_inboxes)` for all accepted followers, - /// excluding blocked actors and blocked domains. - /// Returns `None` if there are no eligible followers. + /// Route outbound deliveries: publish [`FederationEvent::DeliveryRequested`] when an + /// [`crate::data::EventPublisher`] is configured, otherwise spawn a fire-and-forget task. + /// + /// `sends` — pre-prepared `SendActivityTask` objects (used in the spawn path). + /// `activity_json` — serialized activity (used in the EventPublisher path). + /// `inboxes` — target inbox URLs (used in the EventPublisher path). + /// + /// Both `sends` and `inboxes` are prepared by the caller from the same activity so there + /// is no double-serialisation overhead on either path. + async fn dispatch_deliveries( + &self, + data: &activitypub_federation::config::Data, + local_actor: &DbActor, + inboxes: Vec, + sends: Vec, + activity_json: serde_json::Value, + ) -> anyhow::Result<()> { + if let Some(publisher) = data.event_publisher.as_ref() { + for inbox in inboxes { + let event = FederationEvent::DeliveryRequested { + inbox, + activity: activity_json.clone(), + signing_actor_id: local_actor.user_id, + }; + if let Err(e) = publisher.publish(event).await { + tracing::warn!(error = %e, "failed to enqueue DeliveryRequested event"); + } + } + } else { + let data = data.clone(); + let max_attempts = self.delivery_max_attempts; + let initial_delay = self.delivery_initial_delay_secs; + tokio::spawn(async move { + let failures = + send_with_retry(sends, &data, max_attempts, initial_delay).await; + if !failures.is_empty() { + tracing::warn!(count = failures.len(), "some deliveries failed permanently"); + } + }); + } + Ok(()) + } + + /// Deliver a single outbound activity to `inbox`. Call this from a job-queue consumer + /// that received a [`FederationEvent::DeliveryRequested`] event. + /// + /// `activity` must be a fully-serialized AP activity (with `@context`). On permanent + /// failure a [`FederationEvent::DeliveryFailed`] event is published if an + /// [`crate::data::EventPublisher`] is configured. + pub async fn deliver_to_inbox( + &self, + inbox: url::Url, + activity: serde_json::Value, + signing_actor_id: uuid::Uuid, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let actor = get_local_actor(signing_actor_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let id = activity + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| Url::parse(s).ok()) + .unwrap_or_else(|| actor.ap_id.clone()); + let actor_url = activity + .get("actor") + .and_then(|v| v.as_str()) + .and_then(|s| Url::parse(s).ok()) + .unwrap_or_else(|| actor.ap_id.clone()); + + let raw = RawActivity { + id, + actor_url, + value: activity.clone(), + }; + let sends = + SendActivityTask::prepare(&raw, &actor, vec![inbox.clone()], &data).await?; + let failures = send_with_retry( + sends, + &data, + self.delivery_max_attempts, + self.delivery_initial_delay_secs, + ) + .await; + if failures.is_empty() { + return Ok(()); + } + let error_msg = failures + .iter() + .map(|e| e.to_string()) + .collect::>() + .join("; "); + if let Some(publisher) = data.event_publisher.as_ref() { + let _ = publisher + .publish(FederationEvent::DeliveryFailed { + inbox, + activity, + signing_actor_id, + error: error_msg.clone(), + }) + .await; + } + Err(anyhow::anyhow!("delivery failed: {}", error_msg)) + } + + /// Returns `(local_actor, deduplicated_inboxes)` for accepted followers via + /// the `get_accepted_follower_inboxes` repo method (DB-side filtering). async fn accepted_follower_inboxes( &self, data: &activitypub_federation::config::Data, @@ -189,43 +345,52 @@ impl ActivityPubService { .await .map_err(|e| anyhow::anyhow!("{e}"))?; - let followers = data.federation_repo.get_followers(local_user_id).await?; - let blocked = data + let inbox_strs = data .federation_repo - .get_blocked_actors(local_user_id) - .await - .unwrap_or_default(); - let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); - let blocked_domains = data - .federation_repo - .get_blocked_domains() - .await - .unwrap_or_default(); - let blocked_domain_set: std::collections::HashSet = - blocked_domains.into_iter().map(|d| d.domain).collect(); + .get_accepted_follower_inboxes(local_user_id) + .await?; - let accepted: Vec<_> = followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .filter(|f| !blocked_set.contains(&f.actor.url)) - .filter(|f| { - let domain = url::Url::parse(&f.actor.inbox_url) - .ok() - .and_then(|u| u.host_str().map(|s| s.to_string())) - .unwrap_or_default(); - !blocked_domain_set.contains(&domain) - }) - .collect(); - - if accepted.is_empty() { + if inbox_strs.is_empty() { return Ok(None); } - Ok(Some((local_actor, collect_inboxes(&accepted)))) + let inboxes: Vec = inbox_strs + .into_iter() + .filter_map(|s| { + Url::parse(&s) + .map_err(|e| tracing::warn!(inbox = %s, error = %e, "skipping unparseable inbox URL")) + .ok() + }) + .collect(); + + if inboxes.is_empty() { + return Ok(None); + } + + Ok(Some((local_actor, inboxes))) + } + + /// Helper: serialize `activity` to JSON and prepare `SendActivityTask` objects. + /// Returns (activity_json, sends, inboxes_clone) so both dispatch paths have what they need. + async fn prepare_broadcast( + &self, + data: &activitypub_federation::config::Data, + local_actor: &DbActor, + inboxes: Vec, + activity: A, + ) -> anyhow::Result<(serde_json::Value, Vec, Vec)> + where + A: Activity + Serialize + Debug + Send + Sync, + { + let with_ctx = WithContext::new_default(activity); + // Borrow for JSON (does not move with_ctx). + let activity_json = serde_json::to_value(&with_ctx)?; + // Borrow for prepare (does not move with_ctx). + let sends = + SendActivityTask::prepare(&with_ctx, local_actor, inboxes.clone(), data).await?; + Ok((activity_json, sends, inboxes)) } - /// Build an OrderedCollection or OrderedCollectionPage JSON for the local - /// user's followers list. Pass `page = None` for the root collection. pub async fn followers_collection_json( &self, user_id: uuid::Uuid, @@ -254,8 +419,7 @@ impl ActivityPubService { "orderedItems": items, }); if has_next { - obj["next"] = - serde_json::json!(format!("{}?page={}", collection_id, p + 1)); + obj["next"] = serde_json::json!(format!("{}?page={}", collection_id, p + 1)); } obj } else { @@ -270,8 +434,6 @@ impl ActivityPubService { Ok(serde_json::to_string(&obj)?) } - /// Build an OrderedCollection or OrderedCollectionPage JSON for the local - /// user's following list. Pass `page = None` for the root collection. pub async fn following_collection_json( &self, user_id: uuid::Uuid, @@ -300,8 +462,7 @@ impl ActivityPubService { "orderedItems": items, }); if has_next { - obj["next"] = - serde_json::json!(format!("{}?page={}", collection_id, p + 1)); + obj["next"] = serde_json::json!(format!("{}?page={}", collection_id, p + 1)); } obj } else { @@ -330,8 +491,6 @@ impl ActivityPubService { Ok(serde_json::to_string(&WithContext::new_default(person))?) } - /// Mark a remote follower as accepted in the DB only — no AP activity is sent. - /// The caller is responsible for delivering the Accept activity separately. pub async fn mark_follower_accepted( &self, user_id: uuid::Uuid, @@ -344,8 +503,6 @@ impl ActivityPubService { .map_err(|e| anyhow::anyhow!("{e}")) } - /// Remove a remote follower from the DB only — no AP activity is sent. - /// The caller is responsible for delivering the Reject activity separately. pub async fn mark_follower_rejected( &self, user_id: uuid::Uuid, @@ -358,16 +515,14 @@ impl ActivityPubService { .map_err(|e| anyhow::anyhow!("{e}")) } - /// Resolve a `@user@domain` handle to actor data using a signed HTTP request. - /// Unlike a plain unauthenticated fetch, this works with instances (e.g. Threads) - /// that require HTTP signatures before returning full actor JSON. pub async fn lookup_actor_by_handle( &self, handle: &str, ) -> anyhow::Result { tracing::info!(handle, "looking up remote actor"); let data = self.federation_config.to_request_data(); - let actor = Self::webfinger_https(handle, &data).await + let actor = Self::webfinger_https(handle, &data) + .await .inspect_err(|e| tracing::warn!(handle, error = %e, "actor lookup failed"))?; let domain = actor.ap_id.host_str().unwrap_or("").to_string(); let handle = format!("{}@{}", actor.username, domain); @@ -388,9 +543,7 @@ impl ActivityPubService { }) } - /// Returns the ActivityPub router compatible with any outer state `S`. - /// Handlers only use `Data` injected by the middleware layer, - /// so the router is independent of the application state type. + /// Returns the ActivityPub router. Inbox routes enforce a 1 MB body limit. pub fn router(&self) -> Router where S: Clone + Send + Sync + 'static, @@ -399,19 +552,26 @@ impl ActivityPubService { .route("/.well-known/nodeinfo", get(nodeinfo_well_known_handler)) .route("/nodeinfo/2.0", get(nodeinfo_handler)) .route("/.well-known/webfinger", get(webfinger_handler)) - .route("/inbox", post(inbox_handler)) - .route("/users/{id}/inbox", post(inbox_handler)) + .route( + "/inbox", + post(inbox_handler).layer(DefaultBodyLimit::max(1024 * 1024)), + ) + .route("/users/{id}", get(actor_handler)) + .route( + "/users/{id}/inbox", + post(inbox_handler).layer(DefaultBodyLimit::max(1024 * 1024)), + ) .route("/users/{id}/outbox", get(outbox_handler)) + .route("/users/{id}/followers", get(followers_handler)) + .route("/users/{id}/following", get(following_handler)) .layer(self.federation_config.middleware()) } - /// Fan out an Announce activity to all accepted followers. pub async fn broadcast_announce_to_followers( &self, local_user_id: uuid::Uuid, object_ap_id: url::Url, ) -> anyhow::Result<()> { - // Deterministic ID so Undo(Announce) can reference this same activity. let announce_id = url::Url::parse(&format!( "{}/activities/announce/{}", self.base_url, @@ -440,28 +600,17 @@ impl ActivityPubService { to: vec![crate::urls::AS_PUBLIC.to_string()], cc: vec![local_actor.followers_url.to_string()], }; - - let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( - &activitypub_federation::protocol::context::WithContext::new_default(announce), - &local_actor, - inboxes, - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some Announce deliveries failed"); - } - Ok(()) + let (json, sends, inboxes) = + self.prepare_broadcast(&data, &local_actor, inboxes, announce).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } - /// Fan out an Undo(Announce) activity to all accepted followers. pub async fn broadcast_undo_announce_to_followers( &self, local_user_id: uuid::Uuid, object_ap_id: url::Url, ) -> anyhow::Result<()> { - // Reconstruct the same deterministic announce ID used when the boost was sent. let announce_id = url::Url::parse(&format!( "{}/activities/announce/{}", self.base_url, @@ -495,25 +644,12 @@ impl ActivityPubService { "object": object_ap_id.to_string(), }), }; - - let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( - &activitypub_federation::protocol::context::WithContext::new_default(undo), - &local_actor, - inboxes, - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some Undo(Announce) deliveries failed" - ); - } - Ok(()) + let (json, sends, inboxes) = + self.prepare_broadcast(&data, &local_actor, inboxes, undo).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } - /// Send a Like activity to a single inbox. pub async fn broadcast_like_to_inbox( &self, liker_user_id: uuid::Uuid, @@ -525,7 +661,6 @@ impl ActivityPubService { .await .map_err(|e| anyhow::anyhow!("{e}"))?; - // Deterministic ID so Undo(Like) can reference the same activity. let like_id = url::Url::parse(&format!( "{}/activities/like/{}", self.base_url, @@ -541,25 +676,13 @@ impl ActivityPubService { actor: ObjectId::from(local_actor.ap_id.clone()), object: object_ap_id, }; - - let sends = SendActivityTask::prepare( - &WithContext::new_default(like), - &local_actor, - vec![author_inbox_url], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some Like deliveries failed permanently" - ); - } - Ok(()) + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, vec![author_inbox_url], like) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } - /// Send an Undo(Like) activity to a single inbox. pub async fn broadcast_undo_like_to_inbox( &self, liker_user_id: uuid::Uuid, @@ -571,7 +694,6 @@ impl ActivityPubService { .await .map_err(|e| anyhow::anyhow!("{e}"))?; - // Reconstruct the same deterministic like ID. let like_id = url::Url::parse(&format!( "{}/activities/like/{}", self.base_url, @@ -594,27 +716,13 @@ impl ActivityPubService { "object": object_ap_id.to_string(), }), }; - - let sends = SendActivityTask::prepare( - &WithContext::new_default(undo), - &local_actor, - vec![author_inbox_url], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some Undo(Like) deliveries failed permanently" - ); - } - Ok(()) + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, vec![author_inbox_url], undo) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } - /// Resolve a `@user@domain` handle to a `DbActor` over HTTPS directly. - /// The library's `webfinger_resolve_actor` tries HTTP first in debug mode, which breaks - /// on servers that don't redirect HTTP → HTTPS. async fn webfinger_https( handle: &str, data: &activitypub_federation::config::Data, @@ -647,7 +755,7 @@ impl ActivityPubService { .and_then(|l| l["href"].as_str()) .ok_or_else(|| anyhow::anyhow!("no self link in WebFinger response"))? .to_owned(); - tracing::debug!(handle, self_href, "webfinger resolved, fetching actor with signature"); + tracing::debug!(handle, self_href, "webfinger resolved, fetching actor"); let self_url = url::Url::parse(&self_href)?; let actor: DbActor = ObjectId::from(self_url) .dereference(data) @@ -673,28 +781,6 @@ impl ActivityPubService { let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; let follow_id_str = follow_id.to_string(); - let follow = FollowActivity { - id: follow_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: ObjectId::from(remote_actor.ap_id.clone()), - }; - let follow_with_ctx = WithContext::new_default(follow); - - let sends = SendActivityTask::prepare( - &follow_with_ctx, - &local_actor, - vec![remote_actor.inbox()], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some activity deliveries failed permanently" - ); - } let domain = remote_actor.ap_id.host_str().unwrap_or(""); let full_handle = format!("{}@{}", remote_actor.username, domain); @@ -702,19 +788,29 @@ impl ActivityPubService { url: remote_actor.ap_id.to_string(), handle: full_handle, inbox_url: remote_actor.inbox_url.to_string(), - shared_inbox_url: remote_actor - .shared_inbox_url - .as_ref() - .map(|u| u.to_string()), + shared_inbox_url: remote_actor.shared_inbox_url.as_ref().map(|u| u.to_string()), display_name: Some(remote_actor.username.clone()), avatar_url: remote_actor.avatar_url.as_ref().map(|u| u.to_string()), outbox_url: Some(remote_actor.outbox_url.to_string()), }; + + // Save BEFORE delivering Follow — prevents lost state if process restarts + // between delivery and the DB write. data.federation_repo .add_following(local_user_id, remote, &follow_id_str) .await?; - Ok(()) + let follow = FollowActivity { + id: Url::parse(&follow_id_str)?, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: ObjectId::from(remote_actor.ap_id.clone()), + }; + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, vec![remote_actor.inbox()], follow) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } pub async fn unfollow( @@ -725,9 +821,7 @@ impl ActivityPubService { let data = self.federation_config.to_request_data(); if actor_url_str.starts_with(&self.base_url) { - return self - .unfollow_local(local_user_id, actor_url_str, &data) - .await; + return self.unfollow_local(local_user_id, actor_url_str, &data).await; } let remote = data @@ -766,20 +860,10 @@ impl ActivityPubService { object: serde_json::to_value(&follow).map_err(|e| anyhow::anyhow!("{e}"))?, }; - let sends = SendActivityTask::prepare( - &WithContext::new_default(undo), - &local_actor, - vec![inbox], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some activity deliveries failed permanently" - ); - } + let (json, sends, inboxes) = + self.prepare_broadcast(&data, &local_actor, vec![inbox], undo).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await?; data.federation_repo .remove_following(local_user_id, actor_url_str) @@ -829,24 +913,17 @@ impl ActivityPubService { object: follow, }; + // Mark accepted BEFORE delivering Accept. Local state is authoritative; + // if delivery fails, the consumer's job queue retries it. data.federation_repo .update_follower_status(local_user_id, remote_actor_url, FollowerStatus::Accepted) .await?; let inbox = Url::parse(&remote_actor.inbox_url)?; - let sends = SendActivityTask::prepare( - &WithContext::new_default(accept), - &local_actor, - vec![inbox.clone()], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - "failed to deliver Accept activity, but follower is marked accepted locally" - ); - } + let (json, sends, inboxes) = + self.prepare_broadcast(&data, &local_actor, vec![inbox], accept).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await?; let target_inbox = remote_actor .shared_inbox_url @@ -888,20 +965,10 @@ impl ActivityPubService { }; let inbox = Url::parse(&remote_actor.inbox_url)?; - let sends = SendActivityTask::prepare( - &WithContext::new_default(reject), - &local_actor, - vec![inbox], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some activity deliveries failed permanently" - ); - } + let (json, sends, inboxes) = + self.prepare_broadcast(&data, &local_actor, vec![inbox], reject).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await?; data.federation_repo .remove_follower(local_user_id, remote_actor_url) @@ -969,7 +1036,6 @@ impl ActivityPubService { .await } - /// Broadcast a Delete activity to all accepted followers for a removed review. pub async fn broadcast_delete_to_followers( &self, local_user_id: uuid::Uuid, @@ -992,20 +1058,12 @@ impl ActivityPubService { to: vec![crate::urls::AS_PUBLIC.to_string()], cc: vec![local_actor.followers_url.to_string()], }; - let delete_with_ctx = WithContext::new_default(delete); - let sends = - SendActivityTask::prepare(&delete_with_ctx, &local_actor, inboxes, &data).await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some delete activity deliveries failed" - ); - } - Ok(()) + let (json, sends, inboxes) = + self.prepare_broadcast(&data, &local_actor, inboxes, delete).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } - /// Broadcast an Add(WatchlistObject) activity to all accepted followers. pub async fn broadcast_add_to_followers( &self, local_user_id: uuid::Uuid, @@ -1027,16 +1085,12 @@ impl ActivityPubService { to: vec![crate::urls::AS_PUBLIC.to_string()], cc: vec![local_actor.followers_url.to_string()], }; - let add_with_ctx = WithContext::new_default(add); - let sends = SendActivityTask::prepare(&add_with_ctx, &local_actor, inboxes, &data).await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some Add deliveries failed"); - } - Ok(()) + let (json, sends, inboxes) = + self.prepare_broadcast(&data, &local_actor, inboxes, add).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } - /// Broadcast an Undo(Add) activity to all accepted followers. pub async fn broadcast_undo_add_to_followers( &self, local_user_id: uuid::Uuid, @@ -1061,18 +1115,12 @@ impl ActivityPubService { "object": { "id": watchlist_entry_ap_id.as_str() } }), }; - let undo_with_ctx = WithContext::new_default(undo); - let sends = SendActivityTask::prepare(&undo_with_ctx, &local_actor, inboxes, &data).await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some Undo(Add) deliveries failed"); - } - Ok(()) + let (json, sends, inboxes) = + self.prepare_broadcast(&data, &local_actor, inboxes, undo).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } - /// Fan out a Create(Note) activity to all accepted followers. - /// `note` is the fully-formed Note JSON (including id, type, content, etc.). - /// The activity ID is derived deterministically from the note's `id` field. pub async fn broadcast_create_note( &self, local_user_id: uuid::Uuid, @@ -1103,22 +1151,12 @@ impl ActivityPubService { bto: vec![], bcc: vec![], }; - let sends = SendActivityTask::prepare( - &WithContext::new_default(create), - &local_actor, - inboxes, - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some Create(Note) deliveries failed"); - } - Ok(()) + let (json, sends, inboxes) = + self.prepare_broadcast(&data, &local_actor, inboxes, create).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } - /// Fan out an Update(Note) activity to all accepted followers. - /// `note` is the fully-formed Note JSON. pub async fn broadcast_update_note( &self, local_user_id: uuid::Uuid, @@ -1131,8 +1169,8 @@ impl ActivityPubService { return Ok(()); }; - let update_id = crate::urls::activity_url(&self.base_url) - .map_err(|e| anyhow::anyhow!("{e}"))?; + let update_id = + crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; let update = crate::activities::UpdateActivity { id: update_id, @@ -1142,18 +1180,10 @@ impl ActivityPubService { to: vec![crate::urls::AS_PUBLIC.to_string()], cc: vec![local_actor.followers_url.to_string()], }; - let sends = SendActivityTask::prepare( - &WithContext::new_default(update), - &local_actor, - inboxes, - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some Update(Note) deliveries failed"); - } - Ok(()) + let (json, sends, inboxes) = + self.prepare_broadcast(&data, &local_actor, inboxes, update).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } pub async fn broadcast_actor_update(&self, user_id: uuid::Uuid) -> anyhow::Result<()> { @@ -1169,7 +1199,6 @@ impl ActivityPubService { .into_json(&data) .await .map_err(|e| anyhow::anyhow!("{e}"))?; - // Wrap with @context so Mastodon's JSON-LD processor can resolve field names. let person_json = serde_json::to_value(WithContext::new_default(person))?; let update_id = Url::parse(&format!( @@ -1187,57 +1216,18 @@ impl ActivityPubService { cc: vec![local_actor.followers_url.to_string()], }; - let followers = data.federation_repo.get_followers(user_id).await?; - let accepted: Vec<_> = followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .collect(); - - if accepted.is_empty() { - tracing::info!(user_id = %user_id, "no accepted followers, skipping actor update broadcast"); + let Some((_, inboxes)) = self.accepted_follower_inboxes(&data, user_id).await? else { + tracing::info!(%user_id, "no accepted followers, skipping actor update broadcast"); return Ok(()); - } + }; - let inboxes = collect_inboxes(&accepted); - tracing::info!( - user_id = %user_id, - follower_count = accepted.len(), - inbox_count = inboxes.len(), - inboxes = ?inboxes, - "broadcasting actor update" - ); - - let sends = SendActivityTask::prepare( - &WithContext::new_default(update), - &local_actor, - inboxes, - &data, - ) - .await?; - - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - return Err(anyhow::anyhow!( - "actor update delivery failed for {} inbox(es): {}", - failures.len(), - failures - .iter() - .map(|e| e.to_string()) - .collect::>() - .join("; ") - )); - } - tracing::info!(user_id = %user_id, "actor update broadcast complete"); - Ok(()) + tracing::info!(%user_id, inbox_count = inboxes.len(), "broadcasting actor update"); + let (json, sends, inboxes) = + self.prepare_broadcast(&data, &local_actor, inboxes, update).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } - /// Broadcast a Move activity to all accepted followers, signalling that this - /// actor is migrating to `new_actor_url`. - /// - /// **Pre-condition (caller's responsibility):** - /// Before calling this, the application must persist `also_known_as = [new_actor_url]` - /// in the local actor's row so the old actor JSON already advertises the new URL - /// when remote servers fetch it to verify the cross-reference. pub async fn broadcast_move( &self, user_id: uuid::Uuid, @@ -1248,21 +1238,10 @@ impl ActivityPubService { .await .map_err(|e| anyhow::anyhow!("{e}"))?; - let followers = data.federation_repo.get_followers(user_id).await?; - let accepted: Vec<_> = followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .collect(); - - if accepted.is_empty() { - tracing::info!( - %user_id, - "broadcast_move: no accepted followers, nothing to send" - ); + let Some((_, inboxes)) = self.accepted_follower_inboxes(&data, user_id).await? else { + tracing::info!(%user_id, "broadcast_move: no accepted followers, nothing to send"); return Ok(()); - } - - let inboxes = collect_inboxes(&accepted); + }; let move_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; @@ -1275,26 +1254,16 @@ impl ActivityPubService { target: new_actor_url.clone(), }; - let sends = SendActivityTask::prepare( - &WithContext::new_default(move_activity), - &local_actor, - inboxes, - &data, - ) - .await?; - - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!( - count = failures.len(), - "some Move deliveries failed permanently" - ); - } + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, inboxes, move_activity) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await?; tracing::info!( %user_id, target = %new_actor_url, - "broadcast_move: delivered to all accepted followers" + "broadcast_move: dispatched to all accepted followers" ); Ok(()) } @@ -1332,17 +1301,11 @@ impl ActivityPubService { object: Url::parse(actor_url)?, }; let inbox = Url::parse(&remote_actor.inbox_url)?; - let sends = SendActivityTask::prepare( - &WithContext::new_default(block), - &local_actor, - vec![inbox], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(actor = %actor_url, "failed to deliver Block activity"); - } + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, vec![inbox], block) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await?; } Ok(()) @@ -1576,12 +1539,16 @@ impl ActivityPubService { fn spawn_backfill(&self, owner_user_id: uuid::Uuid, follower_inbox_url: String) { let config = self.federation_config.clone(); let base_url = self.base_url.clone(); + let max_attempts = self.delivery_max_attempts; + let initial_delay = self.delivery_initial_delay_secs; tokio::spawn(async move { if let Err(e) = ActivityPubService::run_backfill( config, base_url, owner_user_id, follower_inbox_url, + max_attempts, + initial_delay, ) .await { @@ -1595,6 +1562,8 @@ impl ActivityPubService { base_url: String, owner_user_id: uuid::Uuid, follower_inbox_url: String, + max_attempts: u32, + initial_delay: u64, ) -> anyhow::Result<()> { const BATCH_SIZE: usize = 20; @@ -1616,7 +1585,6 @@ impl ActivityPubService { for chunk in objects.chunks(BATCH_SIZE) { for (ap_id, object_json) in chunk { - // Use a stable Create activity ID derived from the object's ap_id let create_id = Url::parse(&format!( "{}/activities/create/{}", base_url, @@ -1641,7 +1609,7 @@ impl ActivityPubService { &data, ) .await?; - let failures = send_with_retry(sends, &data).await; + let failures = send_with_retry(sends, &data, max_attempts, initial_delay).await; if failures.is_empty() { success_count += 1; } else { @@ -1662,6 +1630,7 @@ impl ActivityPubService { Ok(()) } } + #[cfg(test)] #[path = "tests/service.rs"] mod tests; diff --git a/src/tests/actors.rs b/src/tests/actors.rs index 7f510c4..b19a8f2 100644 --- a/src/tests/actors.rs +++ b/src/tests/actors.rs @@ -10,9 +10,9 @@ fn person_serializes_with_enriched_fields() { .into(), preferred_username: "alice".to_string(), inbox: "https://example.com/users/1/inbox".parse().unwrap(), - outbox: "https://example.com/users/1/outbox".parse().unwrap(), - followers: "https://example.com/users/1/followers".parse().unwrap(), - following: "https://example.com/users/1/following".parse().unwrap(), + outbox: Some("https://example.com/users/1/outbox".parse().unwrap()), + followers: Some("https://example.com/users/1/followers".parse().unwrap()), + following: Some("https://example.com/users/1/following".parse().unwrap()), public_key: PublicKey { id: "https://example.com/users/1#main-key".to_string(), owner: "https://example.com/users/1".parse().unwrap(), diff --git a/src/tests/service.rs b/src/tests/service.rs index 336f589..6467f5f 100644 --- a/src/tests/service.rs +++ b/src/tests/service.rs @@ -1,45 +1,3 @@ -use super::*; -use crate::repository::{Follower, FollowerStatus, RemoteActor}; - -fn make_follower(inbox: &str, shared: Option<&str>) -> Follower { - Follower { - actor: RemoteActor { - url: format!("https://remote/{}", inbox), - handle: "user".to_string(), - inbox_url: inbox.to_string(), - shared_inbox_url: shared.map(|s| s.to_string()), - display_name: None, - avatar_url: None, - outbox_url: None, - }, - status: FollowerStatus::Accepted, - } -} - -#[test] -fn collect_inboxes_deduplicates_shared() { - let followers = vec![ - make_follower( - "https://mastodon.social/users/a/inbox", - Some("https://mastodon.social/inbox"), - ), - make_follower( - "https://mastodon.social/users/b/inbox", - Some("https://mastodon.social/inbox"), - ), - make_follower("https://other.instance/users/c/inbox", None), - ]; - let inboxes = collect_inboxes(&followers); - assert_eq!(inboxes.len(), 2); - let strs: Vec<_> = inboxes.iter().map(|u| u.as_str()).collect(); - assert!(strs.contains(&"https://mastodon.social/inbox")); - assert!(strs.contains(&"https://other.instance/users/c/inbox")); -} - -#[test] -fn collect_inboxes_falls_back_to_individual_inbox() { - let followers = vec![make_follower("https://example.com/users/x/inbox", None)]; - let inboxes = collect_inboxes(&followers); - assert_eq!(inboxes.len(), 1); - assert_eq!(inboxes[0].as_str(), "https://example.com/users/x/inbox"); -} +// Inbox deduplication (shared_inbox preference, blocked-actor/domain filtering) +// is now enforced by the repository implementation via `get_accepted_follower_inboxes`. +// Integration tests for broadcast delivery live in the consuming crate's test suite. diff --git a/src/user.rs b/src/user.rs index 6447c34..69fe07b 100644 --- a/src/user.rs +++ b/src/user.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use serde::{Deserialize, Serialize}; use url::Url; #[derive(Debug, Clone)] @@ -7,6 +8,22 @@ pub struct ApProfileField { pub value: String, } +/// Actor type for AP serialization. Defaults to `Person`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ApActorType { + Person, + Service, + Application, + Organization, + Group, +} + +impl Default for ApActorType { + fn default() -> Self { + Self::Person + } +} + /// Resolved actor data returned by [`crate::service::ActivityPubService::lookup_actor_by_handle`]. /// Fetched via a signed HTTP request so strict instances (e.g. Threads) return full data. #[derive(Debug, Clone)] @@ -29,12 +46,18 @@ pub struct LookedUpActor { pub struct ApUser { pub id: uuid::Uuid, pub username: String, + pub display_name: Option, pub bio: Option, pub avatar_url: Option, pub banner_url: Option, pub also_known_as: Option, pub profile_url: Option, pub attachment: Vec, + /// If true, incoming Follow requests must be manually approved before the + /// actor is listed as `manuallyApprovesFollowers=true` in AP JSON. + pub manually_approves_followers: bool, + /// AP actor type serialized in the actor JSON. Defaults to `Person`. + pub actor_type: ApActorType, } #[async_trait] diff --git a/src/webfinger.rs b/src/webfinger.rs index 8754287..e3d994f 100644 --- a/src/webfinger.rs +++ b/src/webfinger.rs @@ -1,13 +1,13 @@ use activitypub_federation::{ config::Data, - fetch::webfinger::{Webfinger, build_webfinger_response, extract_webfinger_name}, + fetch::webfinger::{extract_webfinger_name}, }; use axum::{ extract::Query, http::header, response::{IntoResponse, Response}, }; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use crate::data::FederationData; use crate::error::Error; @@ -17,6 +17,23 @@ pub struct WebfingerQuery { resource: String, } +#[derive(Serialize)] +struct WebfingerLink { + rel: String, + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + kind: Option, + #[serde(skip_serializing_if = "Option::is_none")] + href: Option, +} + +#[derive(Serialize)] +struct WebfingerResponse { + subject: String, + /// Canonical URIs for the same account (acct: URI + AP actor URL). + aliases: Vec, + links: Vec, +} + pub async fn webfinger_handler( Query(query): Query, data: Data, @@ -31,8 +48,25 @@ pub async fn webfinger_handler( .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; let ap_id = crate::urls::actor_url(&data.base_url, user.id); + let acct_uri = format!("acct:{}@{}", user.username, data.domain); + + let wf = WebfingerResponse { + subject: query.resource.clone(), + aliases: vec![acct_uri, ap_id.to_string()], + links: vec![ + WebfingerLink { + rel: "http://webfinger.net/rel/profile-page".to_string(), + kind: Some("text/html".to_string()), + href: Some(ap_id.to_string()), + }, + WebfingerLink { + rel: "self".to_string(), + kind: Some("application/activity+json".to_string()), + href: Some(ap_id.to_string()), + }, + ], + }; - let wf: Webfinger = build_webfinger_response(query.resource, ap_id); let body = serde_json::to_string(&wf).map_err(|e| Error::from(anyhow::anyhow!(e)))?; Ok(([(header::CONTENT_TYPE, "application/jrd+json")], body).into_response()) }