fix: pre-release improvements — scale, correctness, API clarity

#1  count_accepted_followers / get_accepted_followers_page: new DB-side
    methods on FollowRepository — no more loading all followers into memory
    to count or page them.

#2  Move re-follows are now non-blocking: tokio::spawn instead of
    awaiting each sign_and_send inside receive() — inbox handler no longer
    stalls for slow remote servers during account migration.

#3  Remove get_local_objects_for_user from ApContentReader (dead code).
    Backfill and outbox both use the paginated get_local_objects_page.

#6  Rename backfill_outbox → import_remote_outbox with a clear doc
    explaining the direction (import FROM a remote server, not to a follower).

#7  also_known_as: Option<String> → Vec<String> on ApUser, LookedUpActor,
    and DbActor. from_json now stores all aliases; move_act.rs checks all.

Mentions: broadcast_create_note / broadcast_update_note now accept
    mentioned_inboxes: Vec<Url> — delivery goes to followers + mentioned
    actors who aren't already followers. Deduplication is done before
    sending. Pass vec![] if note has no external mentions.

Docs: ApObjectHandler and ApContentReader now have complete doc comments
    with contracts, idempotency guidance, and error-handling semantics.
This commit is contained in:
2026-05-29 02:19:39 +02:00
parent d5f75b4b57
commit 5288696795
9 changed files with 237 additions and 80 deletions

View File

@@ -56,26 +56,42 @@ impl Activity for MoveActivity {
.dereference(data)
.await
.map_err(|e| Error::from(anyhow::anyhow!("{e}")))?;
if target.also_known_as.as_deref() != Some(self.object.as_str()) {
// Verify the new actor claims the old identity via alsoKnownAs.
// The spec allows multiple aliases; check all of them.
let old_url = self.object.as_str();
if !target.also_known_as.iter().any(|a| a == old_url) {
return Err(Error::bad_request(anyhow::anyhow!(
"Move target alsoKnownAs does not reference old actor"
)));
}
let affected = data
.follow_repo
.migrate_follower_actor(self.object.as_str(), self.target.as_str())
.migrate_follower_actor(old_url, self.target.as_str())
.await
.map_err(|e| Error::from(anyhow::anyhow!("{e}")))?;
let affected_count = affected.len();
// Spawn re-follows in the background — do NOT await them inside receive()
// to avoid blocking the inbox handler while making outbound HTTP requests.
let target_inbox = target.inbox_url.clone();
let target_url = self.target.clone();
let base_url = data.base_url.clone();
let data_clone = data.clone();
tokio::spawn(async move {
for local_user_id in &affected {
let local_actor = match crate::actors::get_local_actor(*local_user_id, data).await {
let local_actor =
match crate::actors::get_local_actor(*local_user_id, &data_clone).await {
Ok(a) => a,
Err(e) => {
tracing::warn!(error = %e, %local_user_id, "Move: failed to load local actor");
tracing::warn!(
error = %e,
%local_user_id,
"Move: failed to load local actor"
);
continue;
}
};
let follow_id = match crate::urls::activity_url(&data.base_url) {
let follow_id = match crate::urls::activity_url(&base_url) {
Ok(u) => u,
Err(e) => {
tracing::warn!(error = %e, "Move: failed to generate follow activity URL");
@@ -86,13 +102,13 @@ impl Activity for MoveActivity {
id: follow_id,
kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()),
object: ObjectId::from(self.target.clone()),
object: ObjectId::from(target_url.clone()),
};
let sends = match SendActivityTask::prepare(
&WithContext::new_default(follow),
&local_actor,
vec![target.inbox_url.clone()],
data,
vec![target_inbox.clone()],
&data_clone,
)
.await
{
@@ -103,16 +119,22 @@ impl Activity for MoveActivity {
}
};
for send in sends {
if let Err(e) = send.sign_and_send(data).await {
tracing::warn!(error = %e, %local_user_id, "Move: re-follow delivery failed");
if let Err(e) = send.sign_and_send(&data_clone).await {
tracing::warn!(
error = %e,
%local_user_id,
"Move: re-follow delivery failed"
);
}
}
}
});
tracing::info!(
actor = %self.actor.inner(),
target = %self.target,
affected = affected_count,
"received Move — migrated follower relationships"
"received Move — migrated follower relationships, re-follows spawned"
);
Ok(())
}

View File

@@ -34,7 +34,7 @@ pub struct DbActor {
pub bio: Option<String>,
pub avatar_url: Option<Url>,
pub banner_url: Option<Url>,
pub also_known_as: Option<String>,
pub also_known_as: Vec<String>,
pub profile_url: Option<Url>,
pub attachment: Vec<ApProfileField>,
pub manually_approves_followers: bool,
@@ -283,7 +283,7 @@ impl Object for DbActor {
kind: "Image".to_string(),
url,
});
let also_known_as: Vec<String> = self.also_known_as.into_iter().collect();
let also_known_as = self.also_known_as;
let attachment: Vec<ProfileFieldObject> = self
.attachment
.into_iter()
@@ -395,7 +395,7 @@ impl Object for DbActor {
bio: json.summary.clone(),
avatar_url: json.icon.as_ref().map(|i| i.url.clone()),
banner_url: json.image.as_ref().map(|i| i.url.clone()),
also_known_as: json.also_known_as.into_iter().next(),
also_known_as: json.also_known_as,
profile_url: json.url.clone(),
attachment: json
.attachment

View File

@@ -3,18 +3,19 @@ use chrono::{DateTime, Utc};
use url::Url;
/// Read side — the library queries this when sending content outward.
/// Implement on the same struct as [`ApObjectHandler`] if you prefer
/// a single database type.
/// Implement on the same struct as [`ApObjectHandler`] if you prefer a single
/// database type.
#[async_trait]
pub trait ApContentReader: Send + Sync {
/// All locally-authored objects for this user. Used by backfill on accept_follower.
async fn get_local_objects_for_user(
&self,
user_id: uuid::Uuid,
) -> anyhow::Result<Vec<(Url, serde_json::Value)>>;
/// Newest-first page of locally-authored objects, published before `before`.
/// Returns `(ap_id, object_json, published_at)`. Used by the outbox endpoint.
/// Newest-first page of locally-authored objects for `user_id`, published
/// strictly before `before` (pass `None` for the first page).
/// Returns `(ap_id, object_json, published_at)` tuples.
///
/// Used by the outbox endpoint and by backfill when a new follower is
/// accepted. Implementations MUST:
/// - Return objects in descending `published_at` order.
/// - Exclude deleted and draft content.
/// - Be consistent across pages (no duplicates, no gaps).
async fn get_local_objects_page(
&self,
user_id: uuid::Uuid,
@@ -27,8 +28,22 @@ pub trait ApContentReader: Send + Sync {
}
/// Write side — the library calls these when processing inbound AP activities.
///
/// All methods are called after HTTP signature verification has passed.
/// Returning `Err` propagates a 500 back to the remote server, which will
/// trigger a retry from well-behaved implementations. Return `Ok(())` to
/// silently accept an activity you don't want to act on.
///
/// **Idempotency:** Methods may be called more than once for the same activity
/// (e.g. under a race during duplicate delivery). Implementations should be
/// idempotent — prefer upsert over insert.
#[async_trait]
pub trait ApObjectHandler: Send + Sync {
/// A remote actor published new content.
///
/// `ap_id` is the stable URL of the object (e.g. the Note URL, not the
/// Create activity URL). Store or index the `object` JSON as appropriate
/// for your domain.
async fn on_create(
&self,
ap_id: &Url,
@@ -36,6 +51,10 @@ pub trait ApObjectHandler: Send + Sync {
object: serde_json::Value,
) -> anyhow::Result<()>;
/// A remote actor edited existing content.
///
/// `ap_id` matches a previously received `on_create` call. Update the
/// stored object.
async fn on_update(
&self,
ap_id: &Url,
@@ -43,18 +62,42 @@ pub trait ApObjectHandler: Send + Sync {
object: serde_json::Value,
) -> anyhow::Result<()>;
/// A remote actor deleted an object previously delivered via `on_create`.
async fn on_delete(&self, ap_id: &Url, actor_url: &Url) -> anyhow::Result<()>;
/// A remote actor was deleted or has unfollowed all local users.
///
/// Remove all content and state associated with `actor_url` from local
/// storage. Called for `Delete(actor)` and for `Undo(Follow)`.
async fn on_actor_removed(&self, actor_url: &Url) -> anyhow::Result<()>;
/// A remote actor liked a locally-authored object.
async fn on_like(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
/// A remote actor removed their like from a locally-authored object.
async fn on_unlike(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
/// A remote actor boosted (Announced) a **locally-authored** object.
///
/// `object_url` is your local object's AP URL. The boost count is tracked
/// separately in [`crate::repository::ActorRepository::count_announces`].
async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
/// A remote actor boosted an object hosted on a **different server**.
///
/// Use this to surface cross-server boosts in local feeds. Called instead
/// of `on_announce_received` when the announced object URL is external.
/// Failures are logged and swallowed — they do not fail the activity.
async fn on_announce_of_remote(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
/// A local user was tagged (Mentioned) in an inbound Create or Update.
///
/// Called for every `{"type":"Mention","href":"<local-actor-url>"}` tag
/// found in inbound activities. Use this to send in-app notifications.
/// The note content is also delivered independently via `on_create`.
///
/// Failures are logged and swallowed — a broken notification must not
/// cause the activity to be rejected.
async fn on_mention(
&self,
thought_ap_id: &Url,

View File

@@ -43,6 +43,16 @@ pub trait FollowRepository: Send + Sync {
/// followers, excluding blocked actors/domains. DB-side filtering.
async fn get_accepted_follower_inboxes(&self, local_user_id: uuid::Uuid)
-> Result<Vec<String>>;
/// Count of accepted followers only. More efficient than loading all followers
/// and filtering in application memory.
async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> Result<usize>;
/// Accepted followers page for display purposes. `offset` is 0-based.
async fn get_accepted_followers_page(
&self,
local_user_id: uuid::Uuid,
offset: u32,
limit: usize,
) -> Result<Vec<RemoteActor>>;
// ── Outbound following ──────────────────────────────────────────────────
async fn add_following(

View File

@@ -8,7 +8,19 @@ use crate::{activities::CreateActivity, actors::get_local_actor, federation::ApF
use super::{ActivityPubService, delivery::send_with_retry};
impl ActivityPubService {
pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> {
/// Fetch a remote actor's outbox and import its content into the local instance.
///
/// This is for importing a **remote actor's history** — for example, when you want
/// to surface an account's past posts after a local user follows them. It fetches
/// pages from `outbox_url` and calls `ApObjectHandler::on_create` for each item.
///
/// This is distinct from [`ActivityPubService::run_backfill_for_follower`], which
/// sends **your** local content to a new follower's inbox.
pub async fn import_remote_outbox(
&self,
outbox_url: &str,
actor_url: &str,
) -> anyhow::Result<()> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(
super::HTTP_FETCH_TIMEOUT_SECS,

View File

@@ -226,21 +226,53 @@ impl ActivityPubService {
.await
}
/// Fan out a Create(Note) activity to accepted followers and any explicitly
/// mentioned actors.
///
/// `visibility` controls `to`/`cc` addressing and whether the note is public:
/// - `Public` / `FollowersOnly`: delivered to followers + `mentioned_inboxes`
/// - `Private`: returns immediately — no delivery to anyone
///
/// `mentioned_inboxes` should contain the inbox URLs of remote actors
/// explicitly tagged in the note who are not already followers. Resolve them
/// via [`ActivityPubService::lookup_actor_by_handle`] before calling. Pass an
/// empty `Vec` if there are no external mentions.
pub async fn broadcast_create_note(
&self,
local_user_id: uuid::Uuid,
note: serde_json::Value,
visibility: ApVisibility,
mentioned_inboxes: Vec<Url>,
) -> anyhow::Result<()> {
if visibility == ApVisibility::Private {
return Ok(());
}
let data = self.federation_config.to_request_data();
let Some((local_actor, inboxes)) =
self.accepted_follower_inboxes(&data, local_user_id).await?
else {
let local_actor = crate::actors::get_local_actor(local_user_id, &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
// Merge follower inboxes with explicitly mentioned actor inboxes,
// deduplicating by string to avoid delivering the same inbox twice.
let follower_inboxes = data
.follow_repo
.get_accepted_follower_inboxes(local_user_id)
.await?;
let mut seen = std::collections::HashSet::new();
let mut inboxes: Vec<Url> = follower_inboxes
.into_iter()
.filter_map(|s| Url::parse(&s).ok())
.filter(|u| seen.insert(u.to_string()))
.collect();
for inbox in mentioned_inboxes {
if seen.insert(inbox.to_string()) {
inboxes.push(inbox);
}
}
if inboxes.is_empty() {
return Ok(());
};
}
let note_id_str = note["id"].as_str().unwrap_or("");
let create_id = Url::parse(&format!(
"{}/activities/create/{}",
@@ -266,21 +298,42 @@ impl ActivityPubService {
.await
}
/// Fan out an Update(Note) activity to accepted followers and mentioned actors.
/// See [`broadcast_create_note`] for `mentioned_inboxes` semantics.
pub async fn broadcast_update_note(
&self,
local_user_id: uuid::Uuid,
note: serde_json::Value,
visibility: ApVisibility,
mentioned_inboxes: Vec<Url>,
) -> anyhow::Result<()> {
if visibility == ApVisibility::Private {
return Ok(());
}
let data = self.federation_config.to_request_data();
let Some((local_actor, inboxes)) =
self.accepted_follower_inboxes(&data, local_user_id).await?
else {
let local_actor = crate::actors::get_local_actor(local_user_id, &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let follower_inboxes = data
.follow_repo
.get_accepted_follower_inboxes(local_user_id)
.await?;
let mut seen = std::collections::HashSet::new();
let mut inboxes: Vec<Url> = follower_inboxes
.into_iter()
.filter_map(|s| Url::parse(&s).ok())
.filter(|u| seen.insert(u.to_string()))
.collect();
for inbox in mentioned_inboxes {
if seen.insert(inbox.to_string()) {
inboxes.push(inbox);
}
}
if inboxes.is_empty() {
return Ok(());
};
}
let (to, cc) = visibility_addressing(visibility, &local_actor.followers_url);
let update = crate::activities::UpdateActivity {
id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?,

View File

@@ -209,6 +209,21 @@ impl ActivityPubService {
data.follow_repo.get_pending_followers(local_user_id).await
}
/// Returns one page of accepted followers. Prefer this over `get_accepted_followers`
/// for large accounts — the DB does the filtering rather than loading everything.
pub async fn get_accepted_followers_page(
&self,
local_user_id: uuid::Uuid,
offset: u32,
limit: usize,
) -> anyhow::Result<Vec<RemoteActor>> {
let data = self.federation_config.to_request_data();
data.follow_repo
.get_accepted_followers_page(local_user_id, offset, limit)
.await
}
/// Returns ALL accepted followers. For large accounts use `get_accepted_followers_page`.
pub async fn get_accepted_followers(
&self,
local_user_id: uuid::Uuid,
@@ -224,18 +239,15 @@ impl ActivityPubService {
.collect())
}
/// Count of accepted followers — DB-side query, no in-memory filtering.
pub async fn count_accepted_followers(
&self,
local_user_id: uuid::Uuid,
) -> anyhow::Result<usize> {
let data = self.federation_config.to_request_data();
Ok(data
.follow_repo
.get_followers(local_user_id)
.await?
.into_iter()
.filter(|f| f.status == FollowerStatus::Accepted)
.count())
data.follow_repo
.count_accepted_followers(local_user_id)
.await
}
pub async fn get_following(

View File

@@ -88,6 +88,17 @@ impl FollowRepository for MemFollowRepo {
async fn get_accepted_follower_inboxes(&self, _: uuid::Uuid) -> anyhow::Result<Vec<String>> {
Ok(vec![])
}
async fn count_accepted_followers(&self, _: uuid::Uuid) -> anyhow::Result<usize> {
Ok(0)
}
async fn get_accepted_followers_page(
&self,
_: uuid::Uuid,
_: u32,
_: usize,
) -> anyhow::Result<Vec<RemoteActor>> {
Ok(vec![])
}
async fn add_following(&self, _: uuid::Uuid, _: RemoteActor, _: &str) -> anyhow::Result<()> {
Ok(())
}
@@ -246,7 +257,7 @@ impl MemUserRepo {
bio: None,
avatar_url: None,
banner_url: None,
also_known_as: None,
also_known_as: vec![],
profile_url: None,
attachment: vec![],
manually_approves_followers: true,
@@ -283,12 +294,6 @@ struct MemContentReader;
#[async_trait]
impl ApContentReader for MemContentReader {
async fn get_local_objects_for_user(
&self,
_: uuid::Uuid,
) -> anyhow::Result<Vec<(Url, serde_json::Value)>> {
Ok(vec![])
}
async fn get_local_objects_page(
&self,
_: uuid::Uuid,

View File

@@ -48,7 +48,7 @@ pub struct LookedUpActor {
pub outbox_url: Option<Url>,
pub followers_url: Option<Url>,
pub following_url: Option<Url>,
pub also_known_as: Option<String>,
pub also_known_as: Vec<String>,
pub profile_url: Option<Url>,
pub attachment: Vec<ApProfileField>,
}
@@ -61,7 +61,7 @@ pub struct ApUser {
pub bio: Option<String>,
pub avatar_url: Option<Url>,
pub banner_url: Option<Url>,
pub also_known_as: Option<String>,
pub also_known_as: Vec<String>,
pub profile_url: Option<Url>,
pub attachment: Vec<ApProfileField>,
/// If true, incoming Follow requests must be manually approved before the