diff --git a/src/activities/move_act.rs b/src/activities/move_act.rs index 3594286..858fd71 100644 --- a/src/activities/move_act.rs +++ b/src/activities/move_act.rs @@ -56,63 +56,85 @@ impl Activity for MoveActivity { .dereference(data) .await .map_err(|e| Error::from(anyhow::anyhow!("{e}")))?; - if target.also_known_as.as_deref() != Some(self.object.as_str()) { + // Verify the new actor claims the old identity via alsoKnownAs. + // The spec allows multiple aliases; check all of them. + let old_url = self.object.as_str(); + if !target.also_known_as.iter().any(|a| a == old_url) { return Err(Error::bad_request(anyhow::anyhow!( "Move target alsoKnownAs does not reference old actor" ))); } let affected = data .follow_repo - .migrate_follower_actor(self.object.as_str(), self.target.as_str()) + .migrate_follower_actor(old_url, self.target.as_str()) .await .map_err(|e| Error::from(anyhow::anyhow!("{e}")))?; let affected_count = affected.len(); - for local_user_id in &affected { - let local_actor = match crate::actors::get_local_actor(*local_user_id, data).await { - Ok(a) => a, - Err(e) => { - tracing::warn!(error = %e, %local_user_id, "Move: failed to load local actor"); - continue; - } - }; - let follow_id = match crate::urls::activity_url(&data.base_url) { - Ok(u) => u, - Err(e) => { - tracing::warn!(error = %e, "Move: failed to generate follow activity URL"); - continue; - } - }; - let follow = FollowActivity { - id: follow_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: ObjectId::from(self.target.clone()), - }; - let sends = match SendActivityTask::prepare( - &WithContext::new_default(follow), - &local_actor, - vec![target.inbox_url.clone()], - data, - ) - .await - { - Ok(s) => s, - Err(e) => { - tracing::warn!(error = %e, "Move: failed to prepare re-follow"); - continue; - } - }; - for send in sends { - if let Err(e) = send.sign_and_send(data).await { - tracing::warn!(error = %e, %local_user_id, "Move: re-follow delivery failed"); + + // Spawn re-follows in the background — do NOT await them inside receive() + // to avoid blocking the inbox handler while making outbound HTTP requests. + let target_inbox = target.inbox_url.clone(); + let target_url = self.target.clone(); + let base_url = data.base_url.clone(); + let data_clone = data.clone(); + tokio::spawn(async move { + for local_user_id in &affected { + let local_actor = + match crate::actors::get_local_actor(*local_user_id, &data_clone).await { + Ok(a) => a, + Err(e) => { + tracing::warn!( + error = %e, + %local_user_id, + "Move: failed to load local actor" + ); + continue; + } + }; + let follow_id = match crate::urls::activity_url(&base_url) { + Ok(u) => u, + Err(e) => { + tracing::warn!(error = %e, "Move: failed to generate follow activity URL"); + continue; + } + }; + let follow = FollowActivity { + id: follow_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: ObjectId::from(target_url.clone()), + }; + let sends = match SendActivityTask::prepare( + &WithContext::new_default(follow), + &local_actor, + vec![target_inbox.clone()], + &data_clone, + ) + .await + { + Ok(s) => s, + Err(e) => { + tracing::warn!(error = %e, "Move: failed to prepare re-follow"); + continue; + } + }; + for send in sends { + if let Err(e) = send.sign_and_send(&data_clone).await { + tracing::warn!( + error = %e, + %local_user_id, + "Move: re-follow delivery failed" + ); + } } } - } + }); + tracing::info!( actor = %self.actor.inner(), target = %self.target, affected = affected_count, - "received Move — migrated follower relationships" + "received Move — migrated follower relationships, re-follows spawned" ); Ok(()) } diff --git a/src/actors.rs b/src/actors.rs index c9c09b7..9b2138e 100644 --- a/src/actors.rs +++ b/src/actors.rs @@ -34,7 +34,7 @@ pub struct DbActor { pub bio: Option, pub avatar_url: Option, pub banner_url: Option, - pub also_known_as: Option, + pub also_known_as: Vec, pub profile_url: Option, pub attachment: Vec, pub manually_approves_followers: bool, @@ -283,7 +283,7 @@ impl Object for DbActor { kind: "Image".to_string(), url, }); - let also_known_as: Vec = self.also_known_as.into_iter().collect(); + let also_known_as = self.also_known_as; let attachment: Vec = self .attachment .into_iter() @@ -395,7 +395,7 @@ impl Object for DbActor { bio: json.summary.clone(), avatar_url: json.icon.as_ref().map(|i| i.url.clone()), banner_url: json.image.as_ref().map(|i| i.url.clone()), - also_known_as: json.also_known_as.into_iter().next(), + also_known_as: json.also_known_as, profile_url: json.url.clone(), attachment: json .attachment diff --git a/src/content.rs b/src/content.rs index 598aace..cf224e2 100644 --- a/src/content.rs +++ b/src/content.rs @@ -3,18 +3,19 @@ use chrono::{DateTime, Utc}; use url::Url; /// Read side — the library queries this when sending content outward. -/// Implement on the same struct as [`ApObjectHandler`] if you prefer -/// a single database type. +/// Implement on the same struct as [`ApObjectHandler`] if you prefer a single +/// database type. #[async_trait] pub trait ApContentReader: Send + Sync { - /// All locally-authored objects for this user. Used by backfill on accept_follower. - async fn get_local_objects_for_user( - &self, - user_id: uuid::Uuid, - ) -> anyhow::Result>; - - /// Newest-first page of locally-authored objects, published before `before`. - /// Returns `(ap_id, object_json, published_at)`. Used by the outbox endpoint. + /// Newest-first page of locally-authored objects for `user_id`, published + /// strictly before `before` (pass `None` for the first page). + /// Returns `(ap_id, object_json, published_at)` tuples. + /// + /// Used by the outbox endpoint and by backfill when a new follower is + /// accepted. Implementations MUST: + /// - Return objects in descending `published_at` order. + /// - Exclude deleted and draft content. + /// - Be consistent across pages (no duplicates, no gaps). async fn get_local_objects_page( &self, user_id: uuid::Uuid, @@ -27,8 +28,22 @@ pub trait ApContentReader: Send + Sync { } /// Write side — the library calls these when processing inbound AP activities. +/// +/// All methods are called after HTTP signature verification has passed. +/// Returning `Err` propagates a 500 back to the remote server, which will +/// trigger a retry from well-behaved implementations. Return `Ok(())` to +/// silently accept an activity you don't want to act on. +/// +/// **Idempotency:** Methods may be called more than once for the same activity +/// (e.g. under a race during duplicate delivery). Implementations should be +/// idempotent — prefer upsert over insert. #[async_trait] pub trait ApObjectHandler: Send + Sync { + /// A remote actor published new content. + /// + /// `ap_id` is the stable URL of the object (e.g. the Note URL, not the + /// Create activity URL). Store or index the `object` JSON as appropriate + /// for your domain. async fn on_create( &self, ap_id: &Url, @@ -36,6 +51,10 @@ pub trait ApObjectHandler: Send + Sync { object: serde_json::Value, ) -> anyhow::Result<()>; + /// A remote actor edited existing content. + /// + /// `ap_id` matches a previously received `on_create` call. Update the + /// stored object. async fn on_update( &self, ap_id: &Url, @@ -43,18 +62,42 @@ pub trait ApObjectHandler: Send + Sync { object: serde_json::Value, ) -> anyhow::Result<()>; + /// A remote actor deleted an object previously delivered via `on_create`. async fn on_delete(&self, ap_id: &Url, actor_url: &Url) -> anyhow::Result<()>; + /// A remote actor was deleted or has unfollowed all local users. + /// + /// Remove all content and state associated with `actor_url` from local + /// storage. Called for `Delete(actor)` and for `Undo(Follow)`. async fn on_actor_removed(&self, actor_url: &Url) -> anyhow::Result<()>; + /// A remote actor liked a locally-authored object. async fn on_like(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; + /// A remote actor removed their like from a locally-authored object. async fn on_unlike(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; + /// A remote actor boosted (Announced) a **locally-authored** object. + /// + /// `object_url` is your local object's AP URL. The boost count is tracked + /// separately in [`crate::repository::ActorRepository::count_announces`]. async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; + /// A remote actor boosted an object hosted on a **different server**. + /// + /// Use this to surface cross-server boosts in local feeds. Called instead + /// of `on_announce_received` when the announced object URL is external. + /// Failures are logged and swallowed — they do not fail the activity. async fn on_announce_of_remote(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; + /// A local user was tagged (Mentioned) in an inbound Create or Update. + /// + /// Called for every `{"type":"Mention","href":""}` tag + /// found in inbound activities. Use this to send in-app notifications. + /// The note content is also delivered independently via `on_create`. + /// + /// Failures are logged and swallowed — a broken notification must not + /// cause the activity to be rejected. async fn on_mention( &self, thought_ap_id: &Url, diff --git a/src/repository/follow.rs b/src/repository/follow.rs index 3169a6e..303c184 100644 --- a/src/repository/follow.rs +++ b/src/repository/follow.rs @@ -43,6 +43,16 @@ pub trait FollowRepository: Send + Sync { /// followers, excluding blocked actors/domains. DB-side filtering. async fn get_accepted_follower_inboxes(&self, local_user_id: uuid::Uuid) -> Result>; + /// Count of accepted followers only. More efficient than loading all followers + /// and filtering in application memory. + async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> Result; + /// Accepted followers page for display purposes. `offset` is 0-based. + async fn get_accepted_followers_page( + &self, + local_user_id: uuid::Uuid, + offset: u32, + limit: usize, + ) -> Result>; // ── Outbound following ────────────────────────────────────────────────── async fn add_following( diff --git a/src/service/backfill.rs b/src/service/backfill.rs index a4e70ea..16b4736 100644 --- a/src/service/backfill.rs +++ b/src/service/backfill.rs @@ -8,7 +8,19 @@ use crate::{activities::CreateActivity, actors::get_local_actor, federation::ApF use super::{ActivityPubService, delivery::send_with_retry}; impl ActivityPubService { - pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> { + /// Fetch a remote actor's outbox and import its content into the local instance. + /// + /// This is for importing a **remote actor's history** — for example, when you want + /// to surface an account's past posts after a local user follows them. It fetches + /// pages from `outbox_url` and calls `ApObjectHandler::on_create` for each item. + /// + /// This is distinct from [`ActivityPubService::run_backfill_for_follower`], which + /// sends **your** local content to a new follower's inbox. + pub async fn import_remote_outbox( + &self, + outbox_url: &str, + actor_url: &str, + ) -> anyhow::Result<()> { let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs( super::HTTP_FETCH_TIMEOUT_SECS, diff --git a/src/service/broadcast.rs b/src/service/broadcast.rs index efc484c..d1ea041 100644 --- a/src/service/broadcast.rs +++ b/src/service/broadcast.rs @@ -226,21 +226,53 @@ impl ActivityPubService { .await } + /// Fan out a Create(Note) activity to accepted followers and any explicitly + /// mentioned actors. + /// + /// `visibility` controls `to`/`cc` addressing and whether the note is public: + /// - `Public` / `FollowersOnly`: delivered to followers + `mentioned_inboxes` + /// - `Private`: returns immediately — no delivery to anyone + /// + /// `mentioned_inboxes` should contain the inbox URLs of remote actors + /// explicitly tagged in the note who are not already followers. Resolve them + /// via [`ActivityPubService::lookup_actor_by_handle`] before calling. Pass an + /// empty `Vec` if there are no external mentions. pub async fn broadcast_create_note( &self, local_user_id: uuid::Uuid, note: serde_json::Value, visibility: ApVisibility, + mentioned_inboxes: Vec, ) -> anyhow::Result<()> { if visibility == ApVisibility::Private { return Ok(()); } let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, local_user_id).await? - else { + let local_actor = crate::actors::get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + // Merge follower inboxes with explicitly mentioned actor inboxes, + // deduplicating by string to avoid delivering the same inbox twice. + let follower_inboxes = data + .follow_repo + .get_accepted_follower_inboxes(local_user_id) + .await?; + let mut seen = std::collections::HashSet::new(); + let mut inboxes: Vec = follower_inboxes + .into_iter() + .filter_map(|s| Url::parse(&s).ok()) + .filter(|u| seen.insert(u.to_string())) + .collect(); + for inbox in mentioned_inboxes { + if seen.insert(inbox.to_string()) { + inboxes.push(inbox); + } + } + if inboxes.is_empty() { return Ok(()); - }; + } + let note_id_str = note["id"].as_str().unwrap_or(""); let create_id = Url::parse(&format!( "{}/activities/create/{}", @@ -266,21 +298,42 @@ impl ActivityPubService { .await } + /// Fan out an Update(Note) activity to accepted followers and mentioned actors. + /// See [`broadcast_create_note`] for `mentioned_inboxes` semantics. pub async fn broadcast_update_note( &self, local_user_id: uuid::Uuid, note: serde_json::Value, visibility: ApVisibility, + mentioned_inboxes: Vec, ) -> anyhow::Result<()> { if visibility == ApVisibility::Private { return Ok(()); } let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, local_user_id).await? - else { + let local_actor = crate::actors::get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let follower_inboxes = data + .follow_repo + .get_accepted_follower_inboxes(local_user_id) + .await?; + let mut seen = std::collections::HashSet::new(); + let mut inboxes: Vec = follower_inboxes + .into_iter() + .filter_map(|s| Url::parse(&s).ok()) + .filter(|u| seen.insert(u.to_string())) + .collect(); + for inbox in mentioned_inboxes { + if seen.insert(inbox.to_string()) { + inboxes.push(inbox); + } + } + if inboxes.is_empty() { return Ok(()); - }; + } + let (to, cc) = visibility_addressing(visibility, &local_actor.followers_url); let update = crate::activities::UpdateActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, diff --git a/src/service/follow.rs b/src/service/follow.rs index cd3f15a..3584a40 100644 --- a/src/service/follow.rs +++ b/src/service/follow.rs @@ -209,6 +209,21 @@ impl ActivityPubService { data.follow_repo.get_pending_followers(local_user_id).await } + /// Returns one page of accepted followers. Prefer this over `get_accepted_followers` + /// for large accounts — the DB does the filtering rather than loading everything. + pub async fn get_accepted_followers_page( + &self, + local_user_id: uuid::Uuid, + offset: u32, + limit: usize, + ) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + data.follow_repo + .get_accepted_followers_page(local_user_id, offset, limit) + .await + } + + /// Returns ALL accepted followers. For large accounts use `get_accepted_followers_page`. pub async fn get_accepted_followers( &self, local_user_id: uuid::Uuid, @@ -224,18 +239,15 @@ impl ActivityPubService { .collect()) } + /// Count of accepted followers — DB-side query, no in-memory filtering. pub async fn count_accepted_followers( &self, local_user_id: uuid::Uuid, ) -> anyhow::Result { let data = self.federation_config.to_request_data(); - Ok(data - .follow_repo - .get_followers(local_user_id) - .await? - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .count()) + data.follow_repo + .count_accepted_followers(local_user_id) + .await } pub async fn get_following( diff --git a/src/tests/integration.rs b/src/tests/integration.rs index 5e0da1c..ddd851e 100644 --- a/src/tests/integration.rs +++ b/src/tests/integration.rs @@ -88,6 +88,17 @@ impl FollowRepository for MemFollowRepo { async fn get_accepted_follower_inboxes(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } + async fn count_accepted_followers(&self, _: uuid::Uuid) -> anyhow::Result { + Ok(0) + } + async fn get_accepted_followers_page( + &self, + _: uuid::Uuid, + _: u32, + _: usize, + ) -> anyhow::Result> { + Ok(vec![]) + } async fn add_following(&self, _: uuid::Uuid, _: RemoteActor, _: &str) -> anyhow::Result<()> { Ok(()) } @@ -246,7 +257,7 @@ impl MemUserRepo { bio: None, avatar_url: None, banner_url: None, - also_known_as: None, + also_known_as: vec![], profile_url: None, attachment: vec![], manually_approves_followers: true, @@ -283,12 +294,6 @@ struct MemContentReader; #[async_trait] impl ApContentReader for MemContentReader { - async fn get_local_objects_for_user( - &self, - _: uuid::Uuid, - ) -> anyhow::Result> { - Ok(vec![]) - } async fn get_local_objects_page( &self, _: uuid::Uuid, diff --git a/src/user.rs b/src/user.rs index 0ac986e..540a08d 100644 --- a/src/user.rs +++ b/src/user.rs @@ -48,7 +48,7 @@ pub struct LookedUpActor { pub outbox_url: Option, pub followers_url: Option, pub following_url: Option, - pub also_known_as: Option, + pub also_known_as: Vec, pub profile_url: Option, pub attachment: Vec, } @@ -61,7 +61,7 @@ pub struct ApUser { pub bio: Option, pub avatar_url: Option, pub banner_url: Option, - pub also_known_as: Option, + pub also_known_as: Vec, pub profile_url: Option, pub attachment: Vec, /// If true, incoming Follow requests must be manually approved before the