Compare commits
6 Commits
v0.1.5
...
432f39cbb4
| Author | SHA1 | Date | |
|---|---|---|---|
| 432f39cbb4 | |||
| 2c509cbf88 | |||
| 52614d406a | |||
| 1949fce620 | |||
| 699258f830 | |||
| 9412a9739a |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1368,7 +1368,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "k-ap"
|
name = "k-ap"
|
||||||
version = "0.1.4"
|
version = "0.1.9"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"activitypub_federation",
|
"activitypub_federation",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "k-ap"
|
name = "k-ap"
|
||||||
version = "0.1.5"
|
version = "0.1.9"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
description = "Generic ActivityPub protocol layer"
|
description = "Generic ActivityPub protocol layer"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
|
|||||||
@@ -65,12 +65,20 @@ impl Activity for FollowActivity {
|
|||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if target_domain != data.domain {
|
if target_domain == data.domain {
|
||||||
return Err(Error::bad_request(anyhow::anyhow!(
|
return Ok(());
|
||||||
"follow target is not a local actor"
|
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
Ok(())
|
// Domain mismatch — still accept if the UUID resolves to a local user.
|
||||||
|
// This handles domain migrations where remote servers have cached the old actor URL.
|
||||||
|
if let Some(uuid) = crate::urls::extract_user_id_from_url(target_url) {
|
||||||
|
if data.user_repo.find_by_id(uuid).await.ok().flatten().is_some() {
|
||||||
|
tracing::debug!(target = %target_url, local_domain = %data.domain, "accepting follow for migrated actor URL");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(Error::bad_request(anyhow::anyhow!(
|
||||||
|
"follow target is not a local actor"
|
||||||
|
)))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||||
@@ -829,10 +837,86 @@ impl Activity for MoveActivity {
|
|||||||
if data.federation_repo.is_domain_blocked(domain).await? {
|
if data.federation_repo.is_domain_blocked(domain).await? {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fetch the target actor via signed request.
|
||||||
|
let target = ObjectId::<DbActor>::from(self.target.clone())
|
||||||
|
.dereference(data)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::from(anyhow::anyhow!("{e}")))?;
|
||||||
|
|
||||||
|
// Verify the new actor claims the old identity via alsoKnownAs.
|
||||||
|
let old_url = self.object.as_str();
|
||||||
|
if target.also_known_as.as_deref() != Some(old_url) {
|
||||||
|
return Err(Error::bad_request(anyhow::anyhow!(
|
||||||
|
"Move target alsoKnownAs does not reference old actor"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Migrate DB records; get user IDs that need a re-follow.
|
||||||
|
let affected = data
|
||||||
|
.federation_repo
|
||||||
|
.migrate_follower_actor(old_url, self.target.as_str())
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::from(anyhow::anyhow!("{e}")))?;
|
||||||
|
|
||||||
|
let affected_count = affected.len();
|
||||||
|
|
||||||
|
// Re-follow on behalf of each affected local user.
|
||||||
|
for local_user_id in &affected {
|
||||||
|
let local_actor = match crate::actors::get_local_actor(*local_user_id, data).await {
|
||||||
|
Ok(a) => a,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, %local_user_id, "Move: failed to load local actor for re-follow");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let follow_id = match crate::urls::activity_url(&data.base_url) {
|
||||||
|
Ok(u) => u,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "Move: failed to generate follow activity URL");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let follow = FollowActivity {
|
||||||
|
id: follow_id,
|
||||||
|
kind: Default::default(),
|
||||||
|
actor: activitypub_federation::fetch::object_id::ObjectId::from(
|
||||||
|
local_actor.ap_id.clone(),
|
||||||
|
),
|
||||||
|
object: activitypub_federation::fetch::object_id::ObjectId::from(
|
||||||
|
self.target.clone(),
|
||||||
|
),
|
||||||
|
};
|
||||||
|
|
||||||
|
let sends = match activitypub_federation::activity_sending::SendActivityTask::prepare(
|
||||||
|
&activitypub_federation::protocol::context::WithContext::new_default(follow),
|
||||||
|
&local_actor,
|
||||||
|
vec![target.inbox_url.clone()],
|
||||||
|
data,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "Move: failed to prepare re-follow");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for send in sends {
|
||||||
|
if let Err(e) = send.sign_and_send(data).await {
|
||||||
|
tracing::warn!(error = %e, %local_user_id, "Move: re-follow delivery failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
actor = %self.actor.inner(),
|
actor = %self.actor.inner(),
|
||||||
target = %self.target,
|
target = %self.target,
|
||||||
"received Move (account migration) — target noted"
|
affected = affected_count,
|
||||||
|
"received Move — migrated follower relationships"
|
||||||
);
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -193,6 +193,11 @@ pub async fn get_local_actor(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn apex_domain(url: &Url) -> String {
|
||||||
|
let host = url.host_str().unwrap_or("");
|
||||||
|
host.strip_prefix("www.").unwrap_or(host).to_owned()
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl Object for DbActor {
|
impl Object for DbActor {
|
||||||
type DataType = FederationData;
|
type DataType = FederationData;
|
||||||
@@ -319,11 +324,26 @@ impl Object for DbActor {
|
|||||||
expected_domain: &Url,
|
expected_domain: &Url,
|
||||||
_data: &Data<Self::DataType>,
|
_data: &Data<Self::DataType>,
|
||||||
) -> Result<(), Self::Error> {
|
) -> Result<(), Self::Error> {
|
||||||
verify_domains_match(json.id.inner(), expected_domain)?;
|
if verify_domains_match(json.id.inner(), expected_domain).is_ok() {
|
||||||
Ok(())
|
return Ok(());
|
||||||
|
}
|
||||||
|
if apex_domain(json.id.inner()) == apex_domain(expected_domain) {
|
||||||
|
tracing::debug!(
|
||||||
|
actor_id = %json.id.inner(),
|
||||||
|
expected = %expected_domain,
|
||||||
|
"domain verified via www-apex equivalence"
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
verify_domains_match(json.id.inner(), expected_domain).map_err(Error::from)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
|
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
|
||||||
|
tracing::debug!(
|
||||||
|
actor_id = %json.id.inner(),
|
||||||
|
username = %json.preferred_username,
|
||||||
|
"ingesting remote actor"
|
||||||
|
);
|
||||||
let shared_inbox_url = json.endpoints.as_ref().map(|e| e.shared_inbox.to_string());
|
let shared_inbox_url = json.endpoints.as_ref().map(|e| e.shared_inbox.to_string());
|
||||||
let actor = RemoteActor {
|
let actor = RemoteActor {
|
||||||
url: json.id.inner().to_string(),
|
url: json.id.inner().to_string(),
|
||||||
|
|||||||
@@ -131,4 +131,12 @@ pub trait FederationRepository: Send + Sync {
|
|||||||
async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>;
|
async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>;
|
||||||
async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result<Vec<String>>;
|
async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result<Vec<String>>;
|
||||||
async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<bool>;
|
async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<bool>;
|
||||||
|
/// Migrate all local following records from old_actor_url to new_actor_url.
|
||||||
|
/// Returns the local user IDs whose records were migrated (excludes users
|
||||||
|
/// already following the new actor — they need no re-follow).
|
||||||
|
async fn migrate_follower_actor(
|
||||||
|
&self,
|
||||||
|
old_actor_url: &str,
|
||||||
|
new_actor_url: &str,
|
||||||
|
) -> Result<Vec<uuid::Uuid>>;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -337,10 +337,13 @@ impl ActivityPubService {
|
|||||||
&self,
|
&self,
|
||||||
handle: &str,
|
handle: &str,
|
||||||
) -> anyhow::Result<crate::user::LookedUpActor> {
|
) -> anyhow::Result<crate::user::LookedUpActor> {
|
||||||
|
tracing::info!(handle, "looking up remote actor");
|
||||||
let data = self.federation_config.to_request_data();
|
let data = self.federation_config.to_request_data();
|
||||||
let actor = Self::webfinger_https(handle, &data).await?;
|
let actor = Self::webfinger_https(handle, &data).await
|
||||||
|
.inspect_err(|e| tracing::warn!(handle, error = %e, "actor lookup failed"))?;
|
||||||
let domain = actor.ap_id.host_str().unwrap_or("").to_string();
|
let domain = actor.ap_id.host_str().unwrap_or("").to_string();
|
||||||
let handle = format!("{}@{}", actor.username, domain);
|
let handle = format!("{}@{}", actor.username, domain);
|
||||||
|
tracing::info!(handle, ap_url = %actor.ap_id, "remote actor resolved");
|
||||||
Ok(crate::user::LookedUpActor {
|
Ok(crate::user::LookedUpActor {
|
||||||
handle,
|
handle,
|
||||||
display_name: actor.display_name,
|
display_name: actor.display_name,
|
||||||
@@ -597,6 +600,7 @@ impl ActivityPubService {
|
|||||||
"https://{}/.well-known/webfinger?resource=acct:{}@{}",
|
"https://{}/.well-known/webfinger?resource=acct:{}@{}",
|
||||||
domain_str, user, domain_str
|
domain_str, user, domain_str
|
||||||
);
|
);
|
||||||
|
tracing::debug!(handle, wf_url, "resolving webfinger");
|
||||||
let wf: serde_json::Value = reqwest::Client::new()
|
let wf: serde_json::Value = reqwest::Client::new()
|
||||||
.get(&wf_url)
|
.get(&wf_url)
|
||||||
.header("Accept", "application/jrd+json, application/json")
|
.header("Accept", "application/jrd+json, application/json")
|
||||||
@@ -615,6 +619,7 @@ impl ActivityPubService {
|
|||||||
.and_then(|l| l["href"].as_str())
|
.and_then(|l| l["href"].as_str())
|
||||||
.ok_or_else(|| anyhow::anyhow!("no self link in WebFinger response"))?
|
.ok_or_else(|| anyhow::anyhow!("no self link in WebFinger response"))?
|
||||||
.to_owned();
|
.to_owned();
|
||||||
|
tracing::debug!(handle, self_href, "webfinger resolved, fetching actor with signature");
|
||||||
let self_url = url::Url::parse(&self_href)?;
|
let self_url = url::Url::parse(&self_href)?;
|
||||||
let actor: DbActor = ObjectId::from(self_url)
|
let actor: DbActor = ObjectId::from(self_url)
|
||||||
.dereference(data)
|
.dereference(data)
|
||||||
@@ -1198,6 +1203,74 @@ impl ActivityPubService {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Broadcast a Move activity to all accepted followers, signalling that this
|
||||||
|
/// actor is migrating to `new_actor_url`.
|
||||||
|
///
|
||||||
|
/// **Pre-condition (caller's responsibility):**
|
||||||
|
/// Before calling this, the application must persist `also_known_as = [new_actor_url]`
|
||||||
|
/// in the local actor's row so the old actor JSON already advertises the new URL
|
||||||
|
/// when remote servers fetch it to verify the cross-reference.
|
||||||
|
pub async fn broadcast_move(
|
||||||
|
&self,
|
||||||
|
user_id: uuid::Uuid,
|
||||||
|
new_actor_url: url::Url,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let data = self.federation_config.to_request_data();
|
||||||
|
let local_actor = get_local_actor(user_id, &data)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||||
|
|
||||||
|
let followers = data.federation_repo.get_followers(user_id).await?;
|
||||||
|
let accepted: Vec<_> = followers
|
||||||
|
.into_iter()
|
||||||
|
.filter(|f| f.status == FollowerStatus::Accepted)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if accepted.is_empty() {
|
||||||
|
tracing::info!(
|
||||||
|
%user_id,
|
||||||
|
"broadcast_move: no accepted followers, nothing to send"
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let inboxes = collect_inboxes(&accepted);
|
||||||
|
|
||||||
|
let move_id =
|
||||||
|
crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||||
|
|
||||||
|
let move_activity = crate::activities::MoveActivity {
|
||||||
|
id: move_id,
|
||||||
|
kind: Default::default(),
|
||||||
|
actor: ObjectId::from(local_actor.ap_id.clone()),
|
||||||
|
object: local_actor.ap_id.clone(),
|
||||||
|
target: new_actor_url.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let sends = SendActivityTask::prepare(
|
||||||
|
&WithContext::new_default(move_activity),
|
||||||
|
&local_actor,
|
||||||
|
inboxes,
|
||||||
|
&data,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let failures = send_with_retry(sends, &data).await;
|
||||||
|
if !failures.is_empty() {
|
||||||
|
tracing::warn!(
|
||||||
|
count = failures.len(),
|
||||||
|
"some Move deliveries failed permanently"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
%user_id,
|
||||||
|
target = %new_actor_url,
|
||||||
|
"broadcast_move: delivered to all accepted followers"
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn block_actor(
|
pub async fn block_actor(
|
||||||
&self,
|
&self,
|
||||||
local_user_id: uuid::Uuid,
|
local_user_id: uuid::Uuid,
|
||||||
|
|||||||
Reference in New Issue
Block a user