4 Commits

5 changed files with 168 additions and 8 deletions

2
Cargo.lock generated
View File

@@ -1368,7 +1368,7 @@ dependencies = [
[[package]] [[package]]
name = "k-ap" name = "k-ap"
version = "0.1.6" version = "0.1.9"
dependencies = [ dependencies = [
"activitypub_federation", "activitypub_federation",
"anyhow", "anyhow",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "k-ap" name = "k-ap"
version = "0.1.7" version = "0.1.9"
edition = "2024" edition = "2024"
description = "Generic ActivityPub protocol layer" description = "Generic ActivityPub protocol layer"
license = "MIT" license = "MIT"

View File

@@ -65,12 +65,20 @@ impl Activity for FollowActivity {
))); )));
} }
}; };
if target_domain != data.domain { if target_domain == data.domain {
return Err(Error::bad_request(anyhow::anyhow!( return Ok(());
"follow target is not a local actor"
)));
} }
Ok(()) // Domain mismatch — still accept if the UUID resolves to a local user.
// This handles domain migrations where remote servers have cached the old actor URL.
if let Some(uuid) = crate::urls::extract_user_id_from_url(target_url) {
if data.user_repo.find_by_id(uuid).await.ok().flatten().is_some() {
tracing::debug!(target = %target_url, local_domain = %data.domain, "accepting follow for migrated actor URL");
return Ok(());
}
}
Err(Error::bad_request(anyhow::anyhow!(
"follow target is not a local actor"
)))
} }
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
@@ -829,10 +837,86 @@ impl Activity for MoveActivity {
if data.federation_repo.is_domain_blocked(domain).await? { if data.federation_repo.is_domain_blocked(domain).await? {
return Ok(()); return Ok(());
} }
// Fetch the target actor via signed request.
let target = ObjectId::<DbActor>::from(self.target.clone())
.dereference(data)
.await
.map_err(|e| Error::from(anyhow::anyhow!("{e}")))?;
// Verify the new actor claims the old identity via alsoKnownAs.
let old_url = self.object.as_str();
if target.also_known_as.as_deref() != Some(old_url) {
return Err(Error::bad_request(anyhow::anyhow!(
"Move target alsoKnownAs does not reference old actor"
)));
}
// Migrate DB records; get user IDs that need a re-follow.
let affected = data
.federation_repo
.migrate_follower_actor(old_url, self.target.as_str())
.await
.map_err(|e| Error::from(anyhow::anyhow!("{e}")))?;
let affected_count = affected.len();
// Re-follow on behalf of each affected local user.
for local_user_id in &affected {
let local_actor = match crate::actors::get_local_actor(*local_user_id, data).await {
Ok(a) => a,
Err(e) => {
tracing::warn!(error = %e, %local_user_id, "Move: failed to load local actor for re-follow");
continue;
}
};
let follow_id = match crate::urls::activity_url(&data.base_url) {
Ok(u) => u,
Err(e) => {
tracing::warn!(error = %e, "Move: failed to generate follow activity URL");
continue;
}
};
let follow = FollowActivity {
id: follow_id,
kind: Default::default(),
actor: activitypub_federation::fetch::object_id::ObjectId::from(
local_actor.ap_id.clone(),
),
object: activitypub_federation::fetch::object_id::ObjectId::from(
self.target.clone(),
),
};
let sends = match activitypub_federation::activity_sending::SendActivityTask::prepare(
&activitypub_federation::protocol::context::WithContext::new_default(follow),
&local_actor,
vec![target.inbox_url.clone()],
data,
)
.await
{
Ok(s) => s,
Err(e) => {
tracing::warn!(error = %e, "Move: failed to prepare re-follow");
continue;
}
};
for send in sends {
if let Err(e) = send.sign_and_send(data).await {
tracing::warn!(error = %e, %local_user_id, "Move: re-follow delivery failed");
}
}
}
tracing::info!( tracing::info!(
actor = %self.actor.inner(), actor = %self.actor.inner(),
target = %self.target, target = %self.target,
"received Move (account migration) — target noted" affected = affected_count,
"received Move — migrated follower relationships"
); );
Ok(()) Ok(())
} }

View File

@@ -131,4 +131,12 @@ pub trait FederationRepository: Send + Sync {
async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>;
async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result<Vec<String>>; async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result<Vec<String>>;
async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<bool>; async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<bool>;
/// Migrate all local following records from old_actor_url to new_actor_url.
/// Returns the local user IDs whose records were migrated (excludes users
/// already following the new actor — they need no re-follow).
async fn migrate_follower_actor(
&self,
old_actor_url: &str,
new_actor_url: &str,
) -> Result<Vec<uuid::Uuid>>;
} }

View File

@@ -1203,6 +1203,74 @@ impl ActivityPubService {
Ok(()) Ok(())
} }
/// Broadcast a Move activity to all accepted followers, signalling that this
/// actor is migrating to `new_actor_url`.
///
/// **Pre-condition (caller's responsibility):**
/// Before calling this, the application must persist `also_known_as = [new_actor_url]`
/// in the local actor's row so the old actor JSON already advertises the new URL
/// when remote servers fetch it to verify the cross-reference.
pub async fn broadcast_move(
&self,
user_id: uuid::Uuid,
new_actor_url: url::Url,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data();
let local_actor = get_local_actor(user_id, &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let followers = data.federation_repo.get_followers(user_id).await?;
let accepted: Vec<_> = followers
.into_iter()
.filter(|f| f.status == FollowerStatus::Accepted)
.collect();
if accepted.is_empty() {
tracing::info!(
%user_id,
"broadcast_move: no accepted followers, nothing to send"
);
return Ok(());
}
let inboxes = collect_inboxes(&accepted);
let move_id =
crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
let move_activity = crate::activities::MoveActivity {
id: move_id,
kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()),
object: local_actor.ap_id.clone(),
target: new_actor_url.clone(),
};
let sends = SendActivityTask::prepare(
&WithContext::new_default(move_activity),
&local_actor,
inboxes,
&data,
)
.await?;
let failures = send_with_retry(sends, &data).await;
if !failures.is_empty() {
tracing::warn!(
count = failures.len(),
"some Move deliveries failed permanently"
);
}
tracing::info!(
%user_id,
target = %new_actor_url,
"broadcast_move: delivered to all accepted followers"
);
Ok(())
}
pub async fn block_actor( pub async fn block_actor(
&self, &self,
local_user_id: uuid::Uuid, local_user_id: uuid::Uuid,