export feature
This commit is contained in:
@@ -6,12 +6,12 @@ use activitypub_federation::{
|
||||
protocol::context::WithContext,
|
||||
traits::Actor,
|
||||
};
|
||||
use axum::{routing::get, routing::post, Router};
|
||||
use axum::{Router, routing::get, routing::post};
|
||||
use url::Url;
|
||||
|
||||
use crate::{
|
||||
activities::{AcceptActivity, CreateActivity, FollowActivity, RejectActivity, UndoActivity},
|
||||
actors::{get_local_actor, DbActor},
|
||||
actors::{DbActor, get_local_actor},
|
||||
content::ApObjectHandler,
|
||||
data::FederationData,
|
||||
federation::ApFederationConfig,
|
||||
@@ -19,8 +19,8 @@ use crate::{
|
||||
inbox::inbox_handler,
|
||||
outbox::outbox_handler,
|
||||
repository::{FederationRepository, FollowerStatus, FollowingStatus, RemoteActor},
|
||||
user::ApUserRepository,
|
||||
urls::activity_url,
|
||||
user::ApUserRepository,
|
||||
webfinger::webfinger_handler,
|
||||
};
|
||||
|
||||
@@ -64,7 +64,10 @@ impl ActivityPubService {
|
||||
) -> anyhow::Result<Self> {
|
||||
let data = FederationData::new(repo, user_repo, object_handler, base_url.clone());
|
||||
let federation_config = ApFederationConfig::new(data, debug).await?;
|
||||
Ok(Self { federation_config, base_url })
|
||||
Ok(Self {
|
||||
federation_config,
|
||||
base_url,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn federation_config(&self) -> &ApFederationConfig {
|
||||
@@ -82,7 +85,9 @@ impl ActivityPubService {
|
||||
let actor = get_local_actor(uuid, &data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let person = actor.into_json(&data).await
|
||||
let person = actor
|
||||
.into_json(&data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
Ok(serde_json::to_string(&WithContext::new_default(person))?)
|
||||
}
|
||||
@@ -133,7 +138,10 @@ impl ActivityPubService {
|
||||
.await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(count = failures.len(), "some activity deliveries failed permanently");
|
||||
tracing::warn!(
|
||||
count = failures.len(),
|
||||
"some activity deliveries failed permanently"
|
||||
);
|
||||
}
|
||||
|
||||
let remote = RemoteActor {
|
||||
@@ -150,11 +158,17 @@ impl ActivityPubService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn unfollow(&self, local_user_id: uuid::Uuid, actor_url_str: &str) -> anyhow::Result<()> {
|
||||
pub async fn unfollow(
|
||||
&self,
|
||||
local_user_id: uuid::Uuid,
|
||||
actor_url_str: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
|
||||
if actor_url_str.starts_with(&self.base_url) {
|
||||
return self.unfollow_local(local_user_id, actor_url_str, &data).await;
|
||||
return self
|
||||
.unfollow_local(local_user_id, actor_url_str, &data)
|
||||
.await;
|
||||
}
|
||||
|
||||
let remote = data
|
||||
@@ -202,7 +216,10 @@ impl ActivityPubService {
|
||||
.await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(count = failures.len(), "some activity deliveries failed permanently");
|
||||
tracing::warn!(
|
||||
count = failures.len(),
|
||||
"some activity deliveries failed permanently"
|
||||
);
|
||||
}
|
||||
|
||||
data.federation_repo
|
||||
@@ -236,7 +253,9 @@ impl ActivityPubService {
|
||||
.federation_repo
|
||||
.get_follower_follow_activity_id(local_user_id, remote_actor_url)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("follow activity id not found for {}", remote_actor_url))?;
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!("follow activity id not found for {}", remote_actor_url)
|
||||
})?;
|
||||
let follow_id = Url::parse(&follow_id_str)?;
|
||||
let follow = FollowActivity {
|
||||
id: follow_id,
|
||||
@@ -265,7 +284,9 @@ impl ActivityPubService {
|
||||
.await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!("failed to deliver Accept activity, but follower is marked accepted locally");
|
||||
tracing::warn!(
|
||||
"failed to deliver Accept activity, but follower is marked accepted locally"
|
||||
);
|
||||
}
|
||||
|
||||
self.spawn_backfill(local_user_id, remote_actor.inbox_url.clone());
|
||||
@@ -313,7 +334,10 @@ impl ActivityPubService {
|
||||
.await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(count = failures.len(), "some activity deliveries failed permanently");
|
||||
tracing::warn!(
|
||||
count = failures.len(),
|
||||
"some activity deliveries failed permanently"
|
||||
);
|
||||
}
|
||||
|
||||
data.federation_repo
|
||||
@@ -323,12 +347,20 @@ impl ActivityPubService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> {
|
||||
pub async fn get_pending_followers(
|
||||
&self,
|
||||
local_user_id: uuid::Uuid,
|
||||
) -> anyhow::Result<Vec<RemoteActor>> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
data.federation_repo.get_pending_followers(local_user_id).await
|
||||
data.federation_repo
|
||||
.get_pending_followers(local_user_id)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> {
|
||||
pub async fn get_accepted_followers(
|
||||
&self,
|
||||
local_user_id: uuid::Uuid,
|
||||
) -> anyhow::Result<Vec<RemoteActor>> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let followers = data.federation_repo.get_followers(local_user_id).await?;
|
||||
Ok(followers
|
||||
@@ -338,13 +370,22 @@ impl ActivityPubService {
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result<usize> {
|
||||
pub async fn count_accepted_followers(
|
||||
&self,
|
||||
local_user_id: uuid::Uuid,
|
||||
) -> anyhow::Result<usize> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let followers = data.federation_repo.get_followers(local_user_id).await?;
|
||||
Ok(followers.into_iter().filter(|f| f.status == FollowerStatus::Accepted).count())
|
||||
Ok(followers
|
||||
.into_iter()
|
||||
.filter(|f| f.status == FollowerStatus::Accepted)
|
||||
.count())
|
||||
}
|
||||
|
||||
pub async fn get_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> {
|
||||
pub async fn get_following(
|
||||
&self,
|
||||
local_user_id: uuid::Uuid,
|
||||
) -> anyhow::Result<Vec<RemoteActor>> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
data.federation_repo.get_following(local_user_id).await
|
||||
}
|
||||
@@ -354,9 +395,15 @@ impl ActivityPubService {
|
||||
data.federation_repo.count_following(local_user_id).await
|
||||
}
|
||||
|
||||
pub async fn remove_follower(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> {
|
||||
pub async fn remove_follower(
|
||||
&self,
|
||||
local_user_id: uuid::Uuid,
|
||||
actor_url: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
data.federation_repo.remove_follower(local_user_id, actor_url).await
|
||||
data.federation_repo
|
||||
.remove_follower(local_user_id, actor_url)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Broadcast a single object to all accepted followers as a Create activity.
|
||||
@@ -395,10 +442,14 @@ impl ActivityPubService {
|
||||
.filter_map(|f| Url::parse(&f.actor.inbox_url).ok())
|
||||
.collect();
|
||||
|
||||
let sends = SendActivityTask::prepare(&create_with_ctx, &local_actor, inboxes, &data).await?;
|
||||
let sends =
|
||||
SendActivityTask::prepare(&create_with_ctx, &local_actor, inboxes, &data).await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(count = failures.len(), "some activity deliveries failed permanently");
|
||||
tracing::warn!(
|
||||
count = failures.len(),
|
||||
"some activity deliveries failed permanently"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -423,10 +474,17 @@ impl ActivityPubService {
|
||||
let follower_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string();
|
||||
let target_actor_url = crate::urls::actor_url(&self.base_url, target.id);
|
||||
let target_inbox_url = format!("{}/inbox", target_actor_url);
|
||||
let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?.to_string();
|
||||
let follow_id = activity_url(&self.base_url)
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?
|
||||
.to_string();
|
||||
|
||||
data.federation_repo
|
||||
.add_follower(target.id, &follower_actor_url, FollowerStatus::Accepted, &follow_id)
|
||||
.add_follower(
|
||||
target.id,
|
||||
&follower_actor_url,
|
||||
FollowerStatus::Accepted,
|
||||
&follow_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let target_as_remote = RemoteActor {
|
||||
@@ -441,7 +499,11 @@ impl ActivityPubService {
|
||||
.await?;
|
||||
|
||||
data.federation_repo
|
||||
.update_following_status(local_user_id, &target_actor_url.to_string(), FollowingStatus::Accepted)
|
||||
.update_following_status(
|
||||
local_user_id,
|
||||
&target_actor_url.to_string(),
|
||||
FollowingStatus::Accepted,
|
||||
)
|
||||
.await?;
|
||||
|
||||
tracing::info!(follower = %local_user_id, followee = %target.id, "local follow");
|
||||
@@ -460,8 +522,12 @@ impl ActivityPubService {
|
||||
|
||||
let local_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string();
|
||||
|
||||
data.federation_repo.remove_follower(target_user_id, &local_actor_url).await?;
|
||||
data.federation_repo.remove_following(local_user_id, target_actor_url).await?;
|
||||
data.federation_repo
|
||||
.remove_follower(target_user_id, &local_actor_url)
|
||||
.await?;
|
||||
data.federation_repo
|
||||
.remove_following(local_user_id, target_actor_url)
|
||||
.await?;
|
||||
|
||||
tracing::info!(follower = %local_user_id, followee = %target_user_id, "local unfollow");
|
||||
Ok(())
|
||||
@@ -471,7 +537,14 @@ impl ActivityPubService {
|
||||
let config = self.federation_config.clone();
|
||||
let base_url = self.base_url.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = ActivityPubService::run_backfill(config, base_url, owner_user_id, follower_inbox_url).await {
|
||||
if let Err(e) = ActivityPubService::run_backfill(
|
||||
config,
|
||||
base_url,
|
||||
owner_user_id,
|
||||
follower_inbox_url,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(error = %e, "backfill: task failed");
|
||||
}
|
||||
});
|
||||
@@ -491,7 +564,10 @@ impl ActivityPubService {
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let inbox = Url::parse(&follower_inbox_url)?;
|
||||
|
||||
let mut objects = data.object_handler.get_local_objects_for_user(owner_user_id).await?;
|
||||
let mut objects = data
|
||||
.object_handler
|
||||
.get_local_objects_for_user(owner_user_id)
|
||||
.await?;
|
||||
objects.reverse(); // oldest first → chronological feed
|
||||
|
||||
let total = objects.len();
|
||||
@@ -501,7 +577,9 @@ impl ActivityPubService {
|
||||
for chunk in objects.chunks(BATCH_SIZE) {
|
||||
for (ap_id, object_json) in chunk {
|
||||
// Use a stable Create activity ID derived from the object's ap_id
|
||||
let create_id = Url::parse(&format!("{}/activities/create/{}", base_url,
|
||||
let create_id = Url::parse(&format!(
|
||||
"{}/activities/create/{}",
|
||||
base_url,
|
||||
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, ap_id.as_str().as_bytes())
|
||||
))?;
|
||||
|
||||
@@ -517,7 +595,8 @@ impl ActivityPubService {
|
||||
&local_actor,
|
||||
vec![inbox.clone()],
|
||||
&data,
|
||||
).await?;
|
||||
)
|
||||
.await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if failures.is_empty() {
|
||||
success_count += 1;
|
||||
|
||||
Reference in New Issue
Block a user