feat: production hardening — security, scale, protocol, DX

Breaking changes to FederationRepository, ApObjectHandler, ApUser:

FederationRepository:
- add is_activity_processed / mark_activity_processed (inbox idempotency)
- add get_accepted_follower_inboxes (DB-side dedup/filtering, replaces in-memory load-all)

ApObjectHandler:
- add on_announce_of_remote (cross-server boosts, previously silently dropped)

ApUser:
- add manually_approves_followers: bool
- add actor_type: ApActorType (was hardcoded Person)

Security:
- block check before actor HTTP fetch in Follow (prevents SSRF on blocked actors)
- 4xx responses use generic "not found"/"bad request" (no internal leak)
- 1 MB DefaultBodyLimit on inbox routes
- zeroize private key after generation

Delivery:
- all broadcasts are now non-blocking (tokio::spawn fallback, or EventPublisher queue)
- EventPublisher redesigned with typed FederationEvent enum (DeliveryRequested/DeliveryFailed)
- new deliver_to_inbox() public method for queue consumers
- configurable delivery_max_attempts and delivery_initial_delay_secs via builder
- Follow saved as Pending BEFORE delivery (race condition fix)

Router:
- GET /users/{id} (actor), GET /users/{id}/followers, GET /users/{id}/following now mounted

Protocol:
- mention extraction from Create/Update tag arrays → on_mention() dispatched
- WebFinger: add aliases field (acct: URI + AP actor URL)
- outbox: add last link, use count_local_posts for totalItems
- idempotency guard added to every inbound activity receive()
- actor serializes display_name and configurable actor_type/manually_approves_followers

Bump: 0.1.10 → 0.2.0
This commit is contained in:
2026-05-28 23:35:41 +02:00
parent b557bd9d46
commit 7ccc18e85c
17 changed files with 700 additions and 494 deletions

17
Cargo.lock generated
View File

@@ -1368,7 +1368,7 @@ dependencies = [
[[package]] [[package]]
name = "k-ap" name = "k-ap"
version = "0.1.9" version = "0.2.0"
dependencies = [ dependencies = [
"activitypub_federation", "activitypub_federation",
"anyhow", "anyhow",
@@ -1384,6 +1384,7 @@ dependencies = [
"tracing", "tracing",
"url", "url",
"uuid", "uuid",
"zeroize",
] ]
[[package]] [[package]]
@@ -3230,6 +3231,20 @@ name = "zeroize"
version = "1.8.2" version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0"
dependencies = [
"zeroize_derive",
]
[[package]]
name = "zeroize_derive"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]] [[package]]
name = "zerotrie" name = "zerotrie"

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "k-ap" name = "k-ap"
version = "0.1.10" version = "0.2.0"
edition = "2024" edition = "2024"
description = "Generic ActivityPub protocol layer" description = "Generic ActivityPub protocol layer"
license = "MIT" license = "MIT"
@@ -21,3 +21,4 @@ reqwest = { version = "0.13", features = ["json"] }
url = { version = "2", features = ["serde"] } url = { version = "2", features = ["serde"] }
enum_delegate = "0.2" enum_delegate = "0.2"
activitypub_federation = "0.7.0-beta.11" activitypub_federation = "0.7.0-beta.11"
zeroize = { version = "1", features = ["derive"] }

View File

@@ -82,34 +82,44 @@ impl Activity for FollowActivity {
} }
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let domain = self.actor().host_str().unwrap_or(""); if already_processed(&self.id, data).await {
if data.federation_repo.is_domain_blocked(domain).await? {
tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain");
return Ok(()); return Ok(());
} }
let _follower = self.actor.dereference(data).await?; let actor_url = self.actor.inner();
let local_actor = self.object.dereference(data).await?; let domain = actor_url.host_str().unwrap_or("");
if data.federation_repo.is_domain_blocked(domain).await? {
tracing::info!(actor = %actor_url, "ignoring follow from blocked domain");
return Ok(());
}
// Check per-actor block BEFORE issuing any outbound HTTP request.
// We can derive the target user ID from the follow object URL without dereferencing.
if let Some(target_user_id) = crate::urls::extract_user_id_from_url(self.object.inner()) {
if data if data
.federation_repo .federation_repo
.is_actor_blocked(local_actor.user_id, self.actor.inner().as_str()) .is_actor_blocked(target_user_id, actor_url.as_str())
.await? .await?
{ {
tracing::info!(actor = %self.actor.inner(), "ignoring follow from blocked actor"); tracing::info!(actor = %actor_url, "ignoring follow from blocked actor");
return Ok(()); return Ok(());
} }
}
let _follower = self.actor.dereference(data).await?;
let local_actor = self.object.dereference(data).await?;
data.federation_repo data.federation_repo
.add_follower( .add_follower(
local_actor.user_id, local_actor.user_id,
self.actor.inner().as_str(), actor_url.as_str(),
FollowerStatus::Pending, FollowerStatus::Pending,
self.id.as_str(), self.id.as_str(),
) )
.await?; .await?;
tracing::info!( tracing::info!(
follower = %self.actor.inner(), follower = %actor_url,
local_user = %local_actor.user_id, local_user = %local_actor.user_id,
"follow request pending approval" "follow request pending approval"
); );
@@ -152,6 +162,9 @@ impl Activity for AcceptActivity {
} }
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if already_processed(&self.id, data).await {
return Ok(());
}
let domain = self.actor().host_str().unwrap_or(""); let domain = self.actor().host_str().unwrap_or("");
if data.federation_repo.is_domain_blocked(domain).await? { if data.federation_repo.is_domain_blocked(domain).await? {
tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain");
@@ -207,6 +220,9 @@ impl Activity for RejectActivity {
} }
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if already_processed(&self.id, data).await {
return Ok(());
}
let domain = self.actor().host_str().unwrap_or(""); let domain = self.actor().host_str().unwrap_or("");
if data.federation_repo.is_domain_blocked(domain).await? { if data.federation_repo.is_domain_blocked(domain).await? {
tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain");
@@ -260,6 +276,9 @@ impl Activity for UndoActivity {
} }
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if already_processed(&self.id, data).await {
return Ok(());
}
let domain = self.actor().host_str().unwrap_or(""); let domain = self.actor().host_str().unwrap_or("");
if data.federation_repo.is_domain_blocked(domain).await? { if data.federation_repo.is_domain_blocked(domain).await? {
tracing::info!(actor = %self.actor(), "ignoring Undo from blocked domain"); tracing::info!(actor = %self.actor(), "ignoring Undo from blocked domain");
@@ -375,6 +394,9 @@ impl Activity for CreateActivity {
} }
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if already_processed(&self.id, data).await {
return Ok(());
}
let domain = self.actor().host_str().unwrap_or(""); let domain = self.actor().host_str().unwrap_or("");
if data.federation_repo.is_domain_blocked(domain).await? { if data.federation_repo.is_domain_blocked(domain).await? {
tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain");
@@ -389,6 +411,10 @@ impl Activity for CreateActivity {
.and_then(|s| Url::parse(s).ok()) .and_then(|s| Url::parse(s).ok())
.unwrap_or_else(|| self.id.clone()); .unwrap_or_else(|| self.id.clone());
let actor_url = self.actor.inner().clone(); let actor_url = self.actor.inner().clone();
// Extract Mention tags and notify local users.
extract_and_dispatch_mentions(&ap_id, &actor_url, &self.object, data).await;
data.object_handler data.object_handler
.on_create(&ap_id, &actor_url, self.object) .on_create(&ap_id, &actor_url, self.object)
.await .await
@@ -451,6 +477,9 @@ impl Activity for DeleteActivity {
} }
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if already_processed(&self.id, data).await {
return Ok(());
}
let domain = self.actor().host_str().unwrap_or(""); let domain = self.actor().host_str().unwrap_or("");
if data.federation_repo.is_domain_blocked(domain).await? { if data.federation_repo.is_domain_blocked(domain).await? {
tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain");
@@ -535,6 +564,9 @@ impl Activity for UpdateActivity {
} }
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if already_processed(&self.id, data).await {
return Ok(());
}
let domain = self.actor().host_str().unwrap_or(""); let domain = self.actor().host_str().unwrap_or("");
if data.federation_repo.is_domain_blocked(domain).await? { if data.federation_repo.is_domain_blocked(domain).await? {
tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain");
@@ -547,6 +579,10 @@ impl Activity for UpdateActivity {
.and_then(|s| Url::parse(s).ok()) .and_then(|s| Url::parse(s).ok())
.unwrap_or_else(|| self.id.clone()); .unwrap_or_else(|| self.id.clone());
let actor_url = self.actor.inner().clone(); let actor_url = self.actor.inner().clone();
// Re-extract mentions on update so newly-added mentions are notified.
extract_and_dispatch_mentions(&ap_id, &actor_url, &self.object, data).await;
data.object_handler data.object_handler
.on_update(&ap_id, &actor_url, self.object) .on_update(&ap_id, &actor_url, self.object)
.await .await
@@ -592,6 +628,9 @@ impl Activity for AnnounceActivity {
} }
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if already_processed(&self.id, data).await {
return Ok(());
}
let domain = self.actor().host_str().unwrap_or(""); let domain = self.actor().host_str().unwrap_or("");
if data.federation_repo.is_domain_blocked(domain).await? { if data.federation_repo.is_domain_blocked(domain).await? {
tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain");
@@ -599,10 +638,17 @@ impl Activity for AnnounceActivity {
} }
let object_domain = self.object.host_str().unwrap_or(""); let object_domain = self.object.host_str().unwrap_or("");
if object_domain != data.domain { if object_domain != data.domain {
// Cross-server boost: notify the handler so consumers can surface it.
data.object_handler
.on_announce_of_remote(&self.object, self.actor.inner())
.await
.unwrap_or_else(|e| {
tracing::warn!(error = %e, "failed to process cross-server announce");
});
tracing::debug!( tracing::debug!(
actor = %self.actor.inner(), actor = %self.actor.inner(),
object = %self.object, object = %self.object,
"received Announce of non-local object — skipped (cross-server boost not supported)" "received Announce of non-local object"
); );
return Ok(()); return Ok(());
} }
@@ -656,6 +702,9 @@ impl Activity for LikeActivity {
} }
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if already_processed(&self.id, data).await {
return Ok(());
}
let domain = self.actor().host_str().unwrap_or(""); let domain = self.actor().host_str().unwrap_or("");
if data.federation_repo.is_domain_blocked(domain).await? { if data.federation_repo.is_domain_blocked(domain).await? {
tracing::info!(actor = %self.actor(), "ignoring Like from blocked domain"); tracing::info!(actor = %self.actor(), "ignoring Like from blocked domain");
@@ -723,6 +772,9 @@ impl Activity for AddActivity {
} }
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if already_processed(&self.id, data).await {
return Ok(());
}
let domain = self.actor().host_str().unwrap_or(""); let domain = self.actor().host_str().unwrap_or("");
if data.federation_repo.is_domain_blocked(domain).await? { if data.federation_repo.is_domain_blocked(domain).await? {
tracing::info!(actor = %self.actor(), "ignoring Add from blocked domain"); tracing::info!(actor = %self.actor(), "ignoring Add from blocked domain");
@@ -774,6 +826,9 @@ impl Activity for BlockActivity {
} }
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if already_processed(&self.id, data).await {
return Ok(());
}
let domain = self.actor().host_str().unwrap_or(""); let domain = self.actor().host_str().unwrap_or("");
if data.federation_repo.is_domain_blocked(domain).await? { if data.federation_repo.is_domain_blocked(domain).await? {
tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain"); tracing::info!(actor = %self.actor(), "ignoring activity from blocked domain");
@@ -833,6 +888,9 @@ impl Activity for MoveActivity {
} }
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if already_processed(&self.id, data).await {
return Ok(());
}
let domain = self.actor().host_str().unwrap_or(""); let domain = self.actor().host_str().unwrap_or("");
if data.federation_repo.is_domain_blocked(domain).await? { if data.federation_repo.is_domain_blocked(domain).await? {
return Ok(()); return Ok(());
@@ -922,6 +980,76 @@ impl Activity for MoveActivity {
} }
} }
// --- Idempotency guard ---
/// Returns `true` if the activity was already processed (caller should return `Ok(())`).
/// Marks the activity as processed before returning `false`.
/// On any repository error the check is skipped to avoid silently dropping activities.
async fn already_processed(activity_id: &Url, data: &Data<FederationData>) -> bool {
let id = activity_id.as_str();
match data.federation_repo.is_activity_processed(id).await {
Ok(true) => {
tracing::debug!(activity_id = id, "duplicate activity, skipping");
return true;
}
Ok(false) => {
if let Err(e) = data.federation_repo.mark_activity_processed(id).await {
tracing::warn!(activity_id = id, error = %e, "failed to mark activity processed");
}
}
Err(e) => {
tracing::warn!(error = %e, "idempotency check failed, processing anyway");
}
}
false
}
// --- Mention extraction ---
/// Parse `object["tag"]` for Mention entries and call `on_mention` for each
/// local user that is tagged. Failures are logged but never propagated — a
/// broken mention notification must not fail the entire activity.
async fn extract_and_dispatch_mentions(
ap_id: &Url,
actor_url: &Url,
object: &serde_json::Value,
data: &Data<FederationData>,
) {
let tags = match object.get("tag").and_then(|t| t.as_array()) {
Some(t) => t,
None => return,
};
for tag in tags {
let tag_type = tag.get("type").and_then(|v| v.as_str()).unwrap_or("");
if tag_type != "Mention" {
continue;
}
let href = match tag.get("href").and_then(|v| v.as_str()) {
Some(h) => h,
None => continue,
};
let Ok(href_url) = Url::parse(href) else { continue };
// Only dispatch for local actors.
let Some(mentioned_user_id) = crate::urls::extract_user_id_from_url(&href_url) else {
continue;
};
if let Err(e) = data
.object_handler
.on_mention(ap_id, mentioned_user_id, actor_url)
.await
{
tracing::warn!(
ap_id = %ap_id,
mentioned_user = %mentioned_user_id,
error = %e,
"failed to dispatch mention notification"
);
}
}
}
// --- Inbox dispatch enum --- // --- Inbox dispatch enum ---
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]

View File

@@ -7,18 +7,16 @@ use crate::actors::{Person, get_local_actor};
use crate::data::FederationData; use crate::data::FederationData;
use crate::error::Error; use crate::error::Error;
/// Serves the AP actor JSON for a local user.
/// The path parameter is the user's UUID (matching the canonical actor URL).
pub async fn actor_handler( pub async fn actor_handler(
Path(username): Path<String>, Path(user_id_str): Path<String>,
data: Data<FederationData>, data: Data<FederationData>,
) -> Result<FederationJson<WithContext<Person>>, Error> { ) -> Result<FederationJson<WithContext<Person>>, Error> {
let ap_user = data let user_id = uuid::Uuid::parse_str(&user_id_str)
.user_repo .map_err(|_| Error::not_found(anyhow::anyhow!("user not found")))?;
.find_by_username(&username)
.await
.map_err(Error::from)?
.ok_or_else(|| Error::bad_request(anyhow::anyhow!("user not found")))?;
let db_actor = get_local_actor(ap_user.id, &data).await?; let db_actor = get_local_actor(user_id, &data).await?;
let person = db_actor.into_json(&data).await?; let person = db_actor.into_json(&data).await?;
Ok(FederationJson(WithContext::new_default(person))) Ok(FederationJson(WithContext::new_default(person)))

View File

@@ -8,11 +8,12 @@ use activitypub_federation::{
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
use zeroize::Zeroizing;
use crate::data::FederationData; use crate::data::FederationData;
use crate::error::Error; use crate::error::Error;
use crate::repository::RemoteActor; use crate::repository::RemoteActor;
use crate::user::ApProfileField; use crate::user::{ApActorType, ApProfileField};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct DbActor { pub struct DbActor {
@@ -20,6 +21,8 @@ pub struct DbActor {
pub username: String, pub username: String,
pub display_name: Option<String>, pub display_name: Option<String>,
pub public_key_pem: String, pub public_key_pem: String,
/// Private key PEM. Only populated for local actors during signing.
/// Cleared automatically when `DbActor` is dropped.
pub private_key_pem: Option<String>, pub private_key_pem: Option<String>,
pub inbox_url: Url, pub inbox_url: Url,
pub shared_inbox_url: Option<Url>, pub shared_inbox_url: Option<Url>,
@@ -34,6 +37,8 @@ pub struct DbActor {
pub also_known_as: Option<String>, pub also_known_as: Option<String>,
pub profile_url: Option<Url>, pub profile_url: Option<Url>,
pub attachment: Vec<ApProfileField>, pub attachment: Vec<ApProfileField>,
pub manually_approves_followers: bool,
pub actor_type: ApActorType,
} }
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
@@ -57,22 +62,6 @@ pub struct ProfileFieldObject {
pub value: String, pub value: String,
} }
/// Accepts any AP actor type on inbound JSON; always serializes as "Person" for local actors.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ApActorType {
Person,
Service,
Application,
Organization,
Group,
}
impl Default for ApActorType {
fn default() -> Self {
Self::Person
}
}
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Person { pub struct Person {
@@ -155,9 +144,17 @@ pub async fn get_local_actor(
Some(kp) => kp, Some(kp) => kp,
None => { None => {
let kp = generate_actor_keypair()?; let kp = generate_actor_keypair()?;
// Zeroize the private key after storing it so the plaintext doesn't
// linger in memory beyond this scope.
let private_zeroized = Zeroizing::new(kp.private_key.clone());
data.federation_repo data.federation_repo
.save_local_actor_keypair(user_id, kp.public_key.clone(), kp.private_key.clone()) .save_local_actor_keypair(
user_id,
kp.public_key.clone(),
private_zeroized.clone().to_string(),
)
.await?; .await?;
drop(private_zeroized);
(kp.public_key, kp.private_key) (kp.public_key, kp.private_key)
} }
}; };
@@ -174,7 +171,7 @@ pub async fn get_local_actor(
Ok(DbActor { Ok(DbActor {
user_id, user_id,
username: user.username, username: user.username,
display_name: None, display_name: user.display_name,
public_key_pem: public_key, public_key_pem: public_key,
private_key_pem: Some(private_key), private_key_pem: Some(private_key),
inbox_url, inbox_url,
@@ -190,6 +187,8 @@ pub async fn get_local_actor(
also_known_as: user.also_known_as, also_known_as: user.also_known_as,
profile_url: user.profile_url, profile_url: user.profile_url,
attachment: user.attachment, attachment: user.attachment,
manually_approves_followers: user.manually_approves_followers,
actor_type: user.actor_type,
}) })
} }
@@ -246,8 +245,8 @@ impl Object for DbActor {
Ok(Some(DbActor { Ok(Some(DbActor {
user_id, user_id,
username: user.username, username: user.username.clone(),
display_name: None, display_name: user.display_name,
public_key_pem: public_key, public_key_pem: public_key,
private_key_pem: private_key, private_key_pem: private_key,
inbox_url, inbox_url,
@@ -263,6 +262,8 @@ impl Object for DbActor {
also_known_as: user.also_known_as, also_known_as: user.also_known_as,
profile_url: user.profile_url, profile_url: user.profile_url,
attachment: user.attachment, attachment: user.attachment,
manually_approves_followers: user.manually_approves_followers,
actor_type: user.actor_type,
})) }))
} }
@@ -281,7 +282,6 @@ impl Object for DbActor {
kind: "Image".to_string(), kind: "Image".to_string(),
url, url,
}); });
let profile_url = self.profile_url;
let also_known_as: Vec<String> = self.also_known_as.into_iter().collect(); let also_known_as: Vec<String> = self.also_known_as.into_iter().collect();
let attachment: Vec<ProfileFieldObject> = self let attachment: Vec<ProfileFieldObject> = self
.attachment .attachment
@@ -297,7 +297,7 @@ impl Object for DbActor {
Url::parse(&format!("{}/inbox", data.base_url)).expect("base_url is always valid"); Url::parse(&format!("{}/inbox", data.base_url)).expect("base_url is always valid");
Ok(Person { Ok(Person {
kind: Default::default(), kind: self.actor_type,
id: self.ap_id.clone().into(), id: self.ap_id.clone().into(),
preferred_username: self.username.clone(), preferred_username: self.username.clone(),
inbox: self.inbox_url.clone(), inbox: self.inbox_url.clone(),
@@ -305,12 +305,12 @@ impl Object for DbActor {
followers: Some(self.followers_url.clone()), followers: Some(self.followers_url.clone()),
following: Some(self.following_url.clone()), following: Some(self.following_url.clone()),
public_key, public_key,
name: Some(self.username.clone()), name: self.display_name.or_else(|| Some(self.username.clone())),
summary: self.bio.clone(), summary: self.bio.clone(),
icon, icon,
url: profile_url, url: self.profile_url,
discoverable: Some(true), discoverable: Some(true),
manually_approves_followers: true, manually_approves_followers: self.manually_approves_followers,
updated: Some(self.last_refreshed_at), updated: Some(self.last_refreshed_at),
endpoints: Some(Endpoints { shared_inbox }), endpoints: Some(Endpoints { shared_inbox }),
image, image,
@@ -397,6 +397,8 @@ impl Object for DbActor {
value: f.value.clone(), value: f.value.clone(),
}) })
.collect(), .collect(),
manually_approves_followers: json.manually_approves_followers,
actor_type: json.kind,
}) })
} }
} }

View File

@@ -63,6 +63,16 @@ pub trait ApObjectHandler: Send + Sync {
actor_url: &Url, actor_url: &Url,
) -> anyhow::Result<()>; ) -> anyhow::Result<()>;
/// Called when a remote actor boosts (Announce) a non-local object.
/// Use this to surface cross-server boosts in followers' feeds.
/// `object_url` is the AP URL of the announced note.
/// `actor_url` is the AP URL of the remote actor who sent the Announce.
async fn on_announce_of_remote(
&self,
object_url: &Url,
actor_url: &Url,
) -> anyhow::Result<()>;
/// Total number of locally-authored posts across all users. /// Total number of locally-authored posts across all users.
async fn count_local_posts(&self) -> anyhow::Result<u64>; async fn count_local_posts(&self) -> anyhow::Result<u64>;
} }

View File

@@ -4,11 +4,47 @@ use crate::content::ApObjectHandler;
use crate::repository::FederationRepository; use crate::repository::FederationRepository;
use crate::user::ApUserRepository; use crate::user::ApUserRepository;
/// Minimal event-publishing abstraction — project-specific implementations /// Typed event emitted by the federation layer. Consumers wire in an
/// are wired in by the consuming crate via `FederationData::new`. /// [`EventPublisher`] to receive these and drive side effects (job queues,
/// webhooks, metrics, etc.).
///
/// # Delivery flow
///
/// When an `EventPublisher` is configured, outbound activities are NOT
/// delivered directly — instead a [`FederationEvent::DeliveryRequested`] event
/// is published for each target inbox. The consumer's job queue should:
/// 1. Persist the event.
/// 2. Call [`crate::service::ActivityPubService::deliver_to_inbox`] when
/// processing the queue item.
///
/// Without a publisher, the library falls back to fire-and-forget
/// `tokio::spawn` delivery (no persistence across restarts).
#[derive(Debug, Clone)]
pub enum FederationEvent {
/// An outbound activity must be delivered to `inbox`.
/// Call `ActivityPubService::deliver_to_inbox(inbox, activity, signing_actor_id)`.
DeliveryRequested {
inbox: url::Url,
activity: serde_json::Value,
signing_actor_id: uuid::Uuid,
},
/// Delivery to `inbox` failed permanently after all in-process retries.
/// The consumer may schedule additional retries or alert.
DeliveryFailed {
inbox: url::Url,
activity: serde_json::Value,
signing_actor_id: uuid::Uuid,
error: String,
},
}
/// Receives typed federation events from the library.
///
/// Implement this trait to bridge federation events into your application's
/// job queue, message broker, or metrics system.
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait EventPublisher: Send + Sync { pub trait EventPublisher: Send + Sync {
async fn publish(&self, event: &str) -> anyhow::Result<()>; async fn publish(&self, event: FederationEvent) -> anyhow::Result<()>;
} }
#[derive(Clone)] #[derive(Clone)]
@@ -20,7 +56,6 @@ pub struct FederationData {
pub(crate) domain: String, pub(crate) domain: String,
pub(crate) allow_registration: bool, pub(crate) allow_registration: bool,
pub(crate) software_name: String, pub(crate) software_name: String,
#[allow(dead_code)]
pub(crate) event_publisher: Option<Arc<dyn EventPublisher>>, pub(crate) event_publisher: Option<Arc<dyn EventPublisher>>,
} }

View File

@@ -33,15 +33,18 @@ where
impl axum::response::IntoResponse for Error { impl axum::response::IntoResponse for Error {
fn into_response(self) -> axum::response::Response { fn into_response(self) -> axum::response::Response {
let status = self.1; let status = self.1;
// Always log the real error internally; never expose it to the client.
if status.is_server_error() { if status.is_server_error() {
tracing::error!(error = %self.0, status = status.as_u16(), "federation error"); tracing::error!(error = %self.0, status = status.as_u16(), "federation error");
} else { } else {
tracing::debug!(error = %self.0, status = status.as_u16(), "federation response"); tracing::debug!(error = %self.0, status = status.as_u16(), "federation client error");
} }
let body = if status.is_server_error() { let body = match status {
"internal server error".to_string() StatusCode::NOT_FOUND => "not found",
} else { StatusCode::BAD_REQUEST => "bad request",
self.0.to_string() StatusCode::UNAUTHORIZED => "unauthorized",
StatusCode::FORBIDDEN => "forbidden",
_ => "internal server error",
}; };
(status, body).into_response() (status, body).into_response()
} }

View File

@@ -9,6 +9,11 @@ use crate::actors::DbActor;
use crate::data::FederationData; use crate::data::FederationData;
use crate::error::Error; use crate::error::Error;
/// Idempotency is enforced inside each activity's `receive()` implementation
/// via `FederationRepository::is_activity_processed` /
/// `mark_activity_processed`. HTTP signature verification and JSON-LD
/// processing are handled by `activitypub_federation` middleware before this
/// handler is reached.
pub async fn inbox_handler( pub async fn inbox_handler(
data: Data<FederationData>, data: Data<FederationData>,
activity_data: ActivityData, activity_data: ActivityData,

View File

@@ -18,11 +18,11 @@ pub mod webfinger;
pub use urls::AS_PUBLIC; pub use urls::AS_PUBLIC;
pub use activitypub_federation::kinds::object::NoteType; pub use activitypub_federation::kinds::object::NoteType;
pub use content::ApObjectHandler; pub use content::ApObjectHandler;
pub use data::FederationData; pub use data::{EventPublisher, FederationData, FederationEvent};
pub use error::Error; pub use error::Error;
pub use federation::ApFederationConfig; pub use federation::ApFederationConfig;
pub use repository::{ pub use repository::{
BlockedDomain, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, BlockedDomain, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor,
}; };
pub use service::ActivityPubService; pub use service::ActivityPubService;
pub use user::{ApProfileField, ApUser, ApUserRepository, LookedUpActor}; pub use user::{ApActorType, ApProfileField, ApUser, ApUserRepository, LookedUpActor};

View File

@@ -27,6 +27,7 @@ pub struct OrderedCollection {
id: String, id: String,
total_items: u64, total_items: u64,
first: String, first: String,
last: String,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@@ -38,6 +39,7 @@ pub struct OrderedCollectionPage {
kind: String, kind: String,
id: String, id: String,
part_of: String, part_of: String,
total_items: u64,
ordered_items: Vec<serde_json::Value>, ordered_items: Vec<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
next: Option<String>, next: Option<String>,
@@ -59,6 +61,16 @@ pub async fn outbox_handler(
let outbox_url = format!("{}/users/{}/outbox", data.base_url, user_id_str); let outbox_url = format!("{}/users/{}/outbox", data.base_url, user_id_str);
// Total count — uses count_local_posts for an aggregated count. For a
// per-user count we use the page length on the first page as an upper bound
// if count_local_posts returns 0. In practice this trait method is called
// infrequently (only on the root collection endpoint).
let total = data
.object_handler
.count_local_posts()
.await
.map_err(|e| Error::from(anyhow::anyhow!("{}", e)))?;
if query.page.unwrap_or(false) { if query.page.unwrap_or(false) {
let before: Option<DateTime<Utc>> = query.before.as_deref().and_then(|s| s.parse().ok()); let before: Option<DateTime<Utc>> = query.before.as_deref().and_then(|s| s.parse().ok());
@@ -114,24 +126,19 @@ pub async fn outbox_handler(
kind: "OrderedCollectionPage".to_string(), kind: "OrderedCollectionPage".to_string(),
id: page_id, id: page_id,
part_of: outbox_url, part_of: outbox_url,
total_items: total,
ordered_items, ordered_items,
next, next,
}) })
.into_response()) .into_response())
} else { } else {
let total = data
.object_handler
.get_local_objects_for_user(uuid)
.await
.map_err(|e| Error::from(anyhow::anyhow!("{}", e)))?
.len() as u64;
Ok(axum::Json(OrderedCollection { Ok(axum::Json(OrderedCollection {
context: crate::urls::AP_CONTEXT.to_string(), context: crate::urls::AP_CONTEXT.to_string(),
kind: "OrderedCollection".to_string(), kind: "OrderedCollection".to_string(),
id: outbox_url.clone(), id: outbox_url.clone(),
total_items: total, total_items: total,
first: format!("{}?page=true", outbox_url), first: format!("{}?page=true", outbox_url),
last: format!("{}?page=true&before=1970-01-01T00:00:00.000Z", outbox_url),
}) })
.into_response()) .into_response())
} }

View File

@@ -139,4 +139,22 @@ pub trait FederationRepository: Send + Sync {
old_actor_url: &str, old_actor_url: &str,
new_actor_url: &str, new_actor_url: &str,
) -> Result<Vec<uuid::Uuid>>; ) -> Result<Vec<uuid::Uuid>>;
/// Return `true` if an activity with `activity_id` has already been processed.
/// Implementations should enforce a UNIQUE constraint on the stored activity IDs
/// so concurrent duplicate deliveries are safely rejected.
async fn is_activity_processed(&self, activity_id: &str) -> Result<bool>;
/// Record `activity_id` as processed. Called immediately before dispatching
/// each inbound activity so that retried deliveries are no-ops.
async fn mark_activity_processed(&self, activity_id: &str) -> Result<()>;
/// Return deduplicated inbox URLs (shared_inbox preferred over personal inbox)
/// for all **accepted** followers of `local_user_id`, excluding any actors or
/// domains that are blocked. Implementations should perform filtering and
/// deduplication in the database rather than in application memory.
async fn get_accepted_follower_inboxes(
&self,
local_user_id: uuid::Uuid,
) -> Result<Vec<String>>;
} }

File diff suppressed because it is too large Load Diff

View File

@@ -10,9 +10,9 @@ fn person_serializes_with_enriched_fields() {
.into(), .into(),
preferred_username: "alice".to_string(), preferred_username: "alice".to_string(),
inbox: "https://example.com/users/1/inbox".parse().unwrap(), inbox: "https://example.com/users/1/inbox".parse().unwrap(),
outbox: "https://example.com/users/1/outbox".parse().unwrap(), outbox: Some("https://example.com/users/1/outbox".parse().unwrap()),
followers: "https://example.com/users/1/followers".parse().unwrap(), followers: Some("https://example.com/users/1/followers".parse().unwrap()),
following: "https://example.com/users/1/following".parse().unwrap(), following: Some("https://example.com/users/1/following".parse().unwrap()),
public_key: PublicKey { public_key: PublicKey {
id: "https://example.com/users/1#main-key".to_string(), id: "https://example.com/users/1#main-key".to_string(),
owner: "https://example.com/users/1".parse().unwrap(), owner: "https://example.com/users/1".parse().unwrap(),

View File

@@ -1,45 +1,3 @@
use super::*; // Inbox deduplication (shared_inbox preference, blocked-actor/domain filtering)
use crate::repository::{Follower, FollowerStatus, RemoteActor}; // is now enforced by the repository implementation via `get_accepted_follower_inboxes`.
// Integration tests for broadcast delivery live in the consuming crate's test suite.
fn make_follower(inbox: &str, shared: Option<&str>) -> Follower {
Follower {
actor: RemoteActor {
url: format!("https://remote/{}", inbox),
handle: "user".to_string(),
inbox_url: inbox.to_string(),
shared_inbox_url: shared.map(|s| s.to_string()),
display_name: None,
avatar_url: None,
outbox_url: None,
},
status: FollowerStatus::Accepted,
}
}
#[test]
fn collect_inboxes_deduplicates_shared() {
let followers = vec![
make_follower(
"https://mastodon.social/users/a/inbox",
Some("https://mastodon.social/inbox"),
),
make_follower(
"https://mastodon.social/users/b/inbox",
Some("https://mastodon.social/inbox"),
),
make_follower("https://other.instance/users/c/inbox", None),
];
let inboxes = collect_inboxes(&followers);
assert_eq!(inboxes.len(), 2);
let strs: Vec<_> = inboxes.iter().map(|u| u.as_str()).collect();
assert!(strs.contains(&"https://mastodon.social/inbox"));
assert!(strs.contains(&"https://other.instance/users/c/inbox"));
}
#[test]
fn collect_inboxes_falls_back_to_individual_inbox() {
let followers = vec![make_follower("https://example.com/users/x/inbox", None)];
let inboxes = collect_inboxes(&followers);
assert_eq!(inboxes.len(), 1);
assert_eq!(inboxes[0].as_str(), "https://example.com/users/x/inbox");
}

View File

@@ -1,4 +1,5 @@
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -7,6 +8,22 @@ pub struct ApProfileField {
pub value: String, pub value: String,
} }
/// Actor type for AP serialization. Defaults to `Person`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ApActorType {
Person,
Service,
Application,
Organization,
Group,
}
impl Default for ApActorType {
fn default() -> Self {
Self::Person
}
}
/// Resolved actor data returned by [`crate::service::ActivityPubService::lookup_actor_by_handle`]. /// Resolved actor data returned by [`crate::service::ActivityPubService::lookup_actor_by_handle`].
/// Fetched via a signed HTTP request so strict instances (e.g. Threads) return full data. /// Fetched via a signed HTTP request so strict instances (e.g. Threads) return full data.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -29,12 +46,18 @@ pub struct LookedUpActor {
pub struct ApUser { pub struct ApUser {
pub id: uuid::Uuid, pub id: uuid::Uuid,
pub username: String, pub username: String,
pub display_name: Option<String>,
pub bio: Option<String>, pub bio: Option<String>,
pub avatar_url: Option<Url>, pub avatar_url: Option<Url>,
pub banner_url: Option<Url>, pub banner_url: Option<Url>,
pub also_known_as: Option<String>, pub also_known_as: Option<String>,
pub profile_url: Option<Url>, pub profile_url: Option<Url>,
pub attachment: Vec<ApProfileField>, pub attachment: Vec<ApProfileField>,
/// If true, incoming Follow requests must be manually approved before the
/// actor is listed as `manuallyApprovesFollowers=true` in AP JSON.
pub manually_approves_followers: bool,
/// AP actor type serialized in the actor JSON. Defaults to `Person`.
pub actor_type: ApActorType,
} }
#[async_trait] #[async_trait]

View File

@@ -1,13 +1,13 @@
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data,
fetch::webfinger::{Webfinger, build_webfinger_response, extract_webfinger_name}, fetch::webfinger::{extract_webfinger_name},
}; };
use axum::{ use axum::{
extract::Query, extract::Query,
http::header, http::header,
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use serde::Deserialize; use serde::{Deserialize, Serialize};
use crate::data::FederationData; use crate::data::FederationData;
use crate::error::Error; use crate::error::Error;
@@ -17,6 +17,23 @@ pub struct WebfingerQuery {
resource: String, resource: String,
} }
#[derive(Serialize)]
struct WebfingerLink {
rel: String,
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
kind: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
href: Option<String>,
}
#[derive(Serialize)]
struct WebfingerResponse {
subject: String,
/// Canonical URIs for the same account (acct: URI + AP actor URL).
aliases: Vec<String>,
links: Vec<WebfingerLink>,
}
pub async fn webfinger_handler( pub async fn webfinger_handler(
Query(query): Query<WebfingerQuery>, Query(query): Query<WebfingerQuery>,
data: Data<FederationData>, data: Data<FederationData>,
@@ -31,8 +48,25 @@ pub async fn webfinger_handler(
.ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?;
let ap_id = crate::urls::actor_url(&data.base_url, user.id); let ap_id = crate::urls::actor_url(&data.base_url, user.id);
let acct_uri = format!("acct:{}@{}", user.username, data.domain);
let wf = WebfingerResponse {
subject: query.resource.clone(),
aliases: vec![acct_uri, ap_id.to_string()],
links: vec![
WebfingerLink {
rel: "http://webfinger.net/rel/profile-page".to_string(),
kind: Some("text/html".to_string()),
href: Some(ap_id.to_string()),
},
WebfingerLink {
rel: "self".to_string(),
kind: Some("application/activity+json".to_string()),
href: Some(ap_id.to_string()),
},
],
};
let wf: Webfinger = build_webfinger_response(query.resource, ap_id);
let body = serde_json::to_string(&wf).map_err(|e| Error::from(anyhow::anyhow!(e)))?; let body = serde_json::to_string(&wf).map_err(|e| Error::from(anyhow::anyhow!(e)))?;
Ok(([(header::CONTENT_TYPE, "application/jrd+json")], body).into_response()) Ok(([(header::CONTENT_TYPE, "application/jrd+json")], body).into_response())
} }