4 Commits

5 changed files with 183 additions and 3 deletions

2
Cargo.lock generated
View File

@@ -1368,7 +1368,7 @@ dependencies = [
[[package]] [[package]]
name = "k-ap" name = "k-ap"
version = "0.1.7" version = "0.1.9"
dependencies = [ dependencies = [
"activitypub_federation", "activitypub_federation",
"anyhow", "anyhow",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "k-ap" name = "k-ap"
version = "0.1.8" version = "0.1.10"
edition = "2024" edition = "2024"
description = "Generic ActivityPub protocol layer" description = "Generic ActivityPub protocol layer"
license = "MIT" license = "MIT"

View File

@@ -837,10 +837,86 @@ impl Activity for MoveActivity {
if data.federation_repo.is_domain_blocked(domain).await? { if data.federation_repo.is_domain_blocked(domain).await? {
return Ok(()); return Ok(());
} }
// Fetch the target actor via signed request.
let target = ObjectId::<DbActor>::from(self.target.clone())
.dereference(data)
.await
.map_err(|e| Error::from(anyhow::anyhow!("{e}")))?;
// Verify the new actor claims the old identity via alsoKnownAs.
let old_url = self.object.as_str();
if target.also_known_as.as_deref() != Some(old_url) {
return Err(Error::bad_request(anyhow::anyhow!(
"Move target alsoKnownAs does not reference old actor"
)));
}
// Migrate DB records; get user IDs that need a re-follow.
let affected = data
.federation_repo
.migrate_follower_actor(old_url, self.target.as_str())
.await
.map_err(|e| Error::from(anyhow::anyhow!("{e}")))?;
let affected_count = affected.len();
// Re-follow on behalf of each affected local user.
for local_user_id in &affected {
let local_actor = match crate::actors::get_local_actor(*local_user_id, data).await {
Ok(a) => a,
Err(e) => {
tracing::warn!(error = %e, %local_user_id, "Move: failed to load local actor for re-follow");
continue;
}
};
let follow_id = match crate::urls::activity_url(&data.base_url) {
Ok(u) => u,
Err(e) => {
tracing::warn!(error = %e, "Move: failed to generate follow activity URL");
continue;
}
};
let follow = FollowActivity {
id: follow_id,
kind: Default::default(),
actor: activitypub_federation::fetch::object_id::ObjectId::from(
local_actor.ap_id.clone(),
),
object: activitypub_federation::fetch::object_id::ObjectId::from(
self.target.clone(),
),
};
let sends = match activitypub_federation::activity_sending::SendActivityTask::prepare(
&activitypub_federation::protocol::context::WithContext::new_default(follow),
&local_actor,
vec![target.inbox_url.clone()],
data,
)
.await
{
Ok(s) => s,
Err(e) => {
tracing::warn!(error = %e, "Move: failed to prepare re-follow");
continue;
}
};
for send in sends {
if let Err(e) = send.sign_and_send(data).await {
tracing::warn!(error = %e, %local_user_id, "Move: re-follow delivery failed");
}
}
}
tracing::info!( tracing::info!(
actor = %self.actor.inner(), actor = %self.actor.inner(),
target = %self.target, target = %self.target,
"received Move (account migration) — target noted" affected = affected_count,
"received Move — migrated follower relationships"
); );
Ok(()) Ok(())
} }

View File

@@ -131,4 +131,12 @@ pub trait FederationRepository: Send + Sync {
async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>;
async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result<Vec<String>>; async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result<Vec<String>>;
async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<bool>; async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<bool>;
/// Migrate all local following records from old_actor_url to new_actor_url.
/// Returns the local user IDs whose records were migrated (excludes users
/// already following the new actor — they need no re-follow).
async fn migrate_follower_actor(
&self,
old_actor_url: &str,
new_actor_url: &str,
) -> Result<Vec<uuid::Uuid>>;
} }

View File

@@ -330,6 +330,34 @@ impl ActivityPubService {
Ok(serde_json::to_string(&WithContext::new_default(person))?) Ok(serde_json::to_string(&WithContext::new_default(person))?)
} }
/// Mark a remote follower as accepted in the DB only — no AP activity is sent.
/// The caller is responsible for delivering the Accept activity separately.
pub async fn mark_follower_accepted(
&self,
user_id: uuid::Uuid,
actor_url: &str,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data();
data.federation_repo
.update_follower_status(user_id, actor_url, crate::repository::FollowerStatus::Accepted)
.await
.map_err(|e| anyhow::anyhow!("{e}"))
}
/// Remove a remote follower from the DB only — no AP activity is sent.
/// The caller is responsible for delivering the Reject activity separately.
pub async fn mark_follower_rejected(
&self,
user_id: uuid::Uuid,
actor_url: &str,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data();
data.federation_repo
.remove_follower(user_id, actor_url)
.await
.map_err(|e| anyhow::anyhow!("{e}"))
}
/// Resolve a `@user@domain` handle to actor data using a signed HTTP request. /// Resolve a `@user@domain` handle to actor data using a signed HTTP request.
/// Unlike a plain unauthenticated fetch, this works with instances (e.g. Threads) /// Unlike a plain unauthenticated fetch, this works with instances (e.g. Threads)
/// that require HTTP signatures before returning full actor JSON. /// that require HTTP signatures before returning full actor JSON.
@@ -1203,6 +1231,74 @@ impl ActivityPubService {
Ok(()) Ok(())
} }
/// Broadcast a Move activity to all accepted followers, signalling that this
/// actor is migrating to `new_actor_url`.
///
/// **Pre-condition (caller's responsibility):**
/// Before calling this, the application must persist `also_known_as = [new_actor_url]`
/// in the local actor's row so the old actor JSON already advertises the new URL
/// when remote servers fetch it to verify the cross-reference.
pub async fn broadcast_move(
&self,
user_id: uuid::Uuid,
new_actor_url: url::Url,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data();
let local_actor = get_local_actor(user_id, &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let followers = data.federation_repo.get_followers(user_id).await?;
let accepted: Vec<_> = followers
.into_iter()
.filter(|f| f.status == FollowerStatus::Accepted)
.collect();
if accepted.is_empty() {
tracing::info!(
%user_id,
"broadcast_move: no accepted followers, nothing to send"
);
return Ok(());
}
let inboxes = collect_inboxes(&accepted);
let move_id =
crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
let move_activity = crate::activities::MoveActivity {
id: move_id,
kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()),
object: local_actor.ap_id.clone(),
target: new_actor_url.clone(),
};
let sends = SendActivityTask::prepare(
&WithContext::new_default(move_activity),
&local_actor,
inboxes,
&data,
)
.await?;
let failures = send_with_retry(sends, &data).await;
if !failures.is_empty() {
tracing::warn!(
count = failures.len(),
"some Move deliveries failed permanently"
);
}
tracing::info!(
%user_id,
target = %new_actor_url,
"broadcast_move: delivered to all accepted followers"
);
Ok(())
}
pub async fn block_actor( pub async fn block_actor(
&self, &self,
local_user_id: uuid::Uuid, local_user_id: uuid::Uuid,