refactor(activities): split into per-activity files with check_guards DRY helper

This commit is contained in:
2026-05-29 00:17:31 +02:00
parent 7ccc18e85c
commit 90a0d91b39
15 changed files with 1040 additions and 1083 deletions

59
src/activities/accept.rs Normal file
View File

@@ -0,0 +1,59 @@
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
kinds::activity::AcceptType,
traits::Activity,
};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::actors::DbActor;
use crate::data::FederationData;
use crate::error::Error;
use crate::repository::FollowingStatus;
use super::follow::FollowActivity;
use super::helpers::check_guards;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AcceptActivity {
pub(crate) id: Url,
#[serde(rename = "type", default)]
pub(crate) kind: AcceptType,
pub(crate) actor: ObjectId<DbActor>,
pub(crate) object: FollowActivity,
}
#[async_trait::async_trait]
impl Activity for AcceptActivity {
type DataType = FederationData;
type Error = Error;
fn id(&self) -> &Url { &self.id }
fn actor(&self) -> &Url { self.actor.inner() }
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if self.actor.inner() != self.object.object.inner() {
return Err(Error::bad_request(anyhow::anyhow!("Accept actor does not match Follow target")));
}
Ok(())
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if check_guards(&self.id, self.actor.inner(), data).await? {
return Ok(());
}
let local_user_id = crate::urls::extract_user_id_from_url(self.object.actor.inner())
.ok_or_else(|| Error::bad_request(anyhow::anyhow!("invalid actor URL in Follow")))?;
data.federation_repo
.update_following_status(
local_user_id,
self.actor.inner().as_str(),
FollowingStatus::Accepted,
)
.await?;
tracing::info!(remote_actor = %self.actor.inner(), "follow accepted by remote");
Ok(())
}
}

66
src/activities/add.rs Normal file
View File

@@ -0,0 +1,66 @@
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
traits::Activity,
};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::actors::DbActor;
use crate::data::FederationData;
use crate::error::Error;
use super::helpers::check_guards;
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
#[serde(rename = "Add")]
pub struct AddType;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AddActivity {
pub(crate) id: Url,
#[serde(rename = "type", default)]
pub(crate) kind: AddType,
pub(crate) actor: ObjectId<DbActor>,
pub(crate) object: serde_json::Value,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) to: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) cc: Vec<String>,
}
#[async_trait::async_trait]
impl Activity for AddActivity {
type DataType = FederationData;
type Error = Error;
fn id(&self) -> &Url { &self.id }
fn actor(&self) -> &Url { self.actor.inner() }
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str())
&& let Ok(attributed_url) = Url::parse(attributed_to)
&& &attributed_url != self.actor.inner()
{
return Err(Error::bad_request(anyhow::anyhow!(
"Add actor does not match object attributedTo"
)));
}
Ok(())
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if check_guards(&self.id, self.actor.inner(), data).await? {
return Ok(());
}
let ap_id = self.id.clone();
let actor_url = self.actor.inner().clone();
data.object_handler
.on_create(&ap_id, &actor_url, self.object)
.await
.map_err(|e| Error::from(anyhow::anyhow!(e)))?;
tracing::info!(actor = %actor_url, "received Add activity");
Ok(())
}
}

View File

@@ -0,0 +1,75 @@
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
protocol::verification::verify_domains_match,
traits::Activity,
};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::actors::DbActor;
use crate::data::FederationData;
use crate::error::Error;
use super::helpers::check_guards;
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
#[serde(rename = "Announce")]
pub struct AnnounceType;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AnnounceActivity {
pub(crate) id: Url,
#[serde(rename = "type", default)]
pub(crate) kind: AnnounceType,
pub(crate) actor: ObjectId<DbActor>,
pub(crate) object: Url,
pub(crate) published: Option<chrono::DateTime<chrono::Utc>>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) to: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) cc: Vec<String>,
}
#[async_trait::async_trait]
impl Activity for AnnounceActivity {
type DataType = FederationData;
type Error = Error;
fn id(&self) -> &Url { &self.id }
fn actor(&self) -> &Url { self.actor.inner() }
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
verify_domains_match(&self.id, self.actor.inner())?;
Ok(())
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if check_guards(&self.id, self.actor.inner(), data).await? {
return Ok(());
}
if self.object.host_str().unwrap_or("") != data.domain {
data.object_handler
.on_announce_of_remote(&self.object, self.actor.inner())
.await
.unwrap_or_else(|e| tracing::warn!(error = %e, "failed to process cross-server announce"));
tracing::debug!(actor = %self.actor.inner(), object = %self.object, "received Announce of non-local object");
return Ok(());
}
data.federation_repo
.add_announce(
self.id.as_str(),
self.object.as_str(),
self.actor.inner().as_str(),
self.published.unwrap_or_else(chrono::Utc::now),
)
.await?;
data.object_handler
.on_announce_received(&self.object, self.actor.inner())
.await
.unwrap_or_else(|e| tracing::warn!(error = %e, "failed to process announce notification"));
tracing::info!(actor = %self.actor.inner(), object = %self.object, "received announce");
Ok(())
}
}

54
src/activities/block.rs Normal file
View File

@@ -0,0 +1,54 @@
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
protocol::verification::verify_domains_match,
traits::Activity,
};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::actors::DbActor;
use crate::data::FederationData;
use crate::error::Error;
use super::helpers::check_guards;
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
#[serde(rename = "Block")]
pub struct BlockType;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct BlockActivity {
pub(crate) id: Url,
#[serde(rename = "type", default)]
pub(crate) kind: BlockType,
pub(crate) actor: ObjectId<DbActor>,
pub(crate) object: Url,
}
#[async_trait::async_trait]
impl Activity for BlockActivity {
type DataType = FederationData;
type Error = Error;
fn id(&self) -> &Url { &self.id }
fn actor(&self) -> &Url { self.actor.inner() }
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
verify_domains_match(&self.id, self.actor.inner())?;
Ok(())
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if check_guards(&self.id, self.actor.inner(), data).await? {
return Ok(());
}
if let Some(local_user_id) = crate::urls::extract_user_id_from_url(&self.object) {
let _ = data.federation_repo.remove_following(local_user_id, self.actor.inner().as_str()).await;
let _ = data.federation_repo.remove_follower(local_user_id, self.actor.inner().as_str()).await;
}
tracing::info!(actor = %self.actor.inner(), "received block — removed following and follower");
Ok(())
}
}

73
src/activities/create.rs Normal file
View File

@@ -0,0 +1,73 @@
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
kinds::activity::CreateType,
traits::Activity,
};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::actors::DbActor;
use crate::data::FederationData;
use crate::error::Error;
use super::helpers::{check_guards, extract_and_dispatch_mentions};
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateActivity {
pub(crate) id: Url,
#[serde(rename = "type", default)]
pub(crate) kind: CreateType,
pub(crate) actor: ObjectId<DbActor>,
pub(crate) object: serde_json::Value,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) to: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) cc: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) bto: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) bcc: Vec<String>,
}
#[async_trait::async_trait]
impl Activity for CreateActivity {
type DataType = FederationData;
type Error = Error;
fn id(&self) -> &Url { &self.id }
fn actor(&self) -> &Url { self.actor.inner() }
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str())
&& let Ok(attributed_url) = Url::parse(attributed_to)
&& &attributed_url != self.actor.inner()
{
return Err(Error::bad_request(anyhow::anyhow!(
"Create actor does not match object attributedTo"
)));
}
Ok(())
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if check_guards(&self.id, self.actor.inner(), data).await? {
return Ok(());
}
let ap_id = self
.object
.get("id")
.and_then(|v| v.as_str())
.and_then(|s| Url::parse(s).ok())
.unwrap_or_else(|| self.id.clone());
let actor_url = self.actor.inner().clone();
extract_and_dispatch_mentions(&ap_id, &actor_url, &self.object, data).await;
data.object_handler
.on_create(&ap_id, &actor_url, self.object)
.await
.map_err(|e| Error::from(anyhow::anyhow!(e)))?;
tracing::info!(actor = %actor_url, "received create activity");
Ok(())
}
}

94
src/activities/delete.rs Normal file
View File

@@ -0,0 +1,94 @@
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
kinds::activity::DeleteType,
traits::Activity,
};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::actors::DbActor;
use crate::data::FederationData;
use crate::error::Error;
use super::helpers::check_guards;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DeleteActivity {
pub(crate) id: Url,
#[serde(rename = "type", default)]
pub(crate) kind: DeleteType,
pub(crate) actor: ObjectId<DbActor>,
pub(crate) object: serde_json::Value,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) to: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) cc: Vec<String>,
}
#[async_trait::async_trait]
impl Activity for DeleteActivity {
type DataType = FederationData;
type Error = Error;
fn id(&self) -> &Url { &self.id }
fn actor(&self) -> &Url { self.actor.inner() }
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let actor_domain = self.actor.inner().host_str().unwrap_or("");
let object_domain = match &self.object {
serde_json::Value::String(s) => Url::parse(s)
.ok()
.and_then(|u| u.host_str().map(|h| h.to_string()))
.unwrap_or_default(),
serde_json::Value::Object(o) => o
.get("id")
.and_then(|v| v.as_str())
.and_then(|s| Url::parse(s).ok())
.and_then(|u| u.host_str().map(|h| h.to_string()))
.unwrap_or_default(),
_ => String::new(),
};
if !object_domain.is_empty() && actor_domain != object_domain {
return Err(Error::bad_request(anyhow::anyhow!(
"Delete actor domain does not match object domain"
)));
}
Ok(())
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if check_guards(&self.id, self.actor.inner(), data).await? {
return Ok(());
}
let actor_url = self.actor.inner().clone();
let object_url_str = match &self.object {
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Object(o) => o
.get("id")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_default(),
_ => String::new(),
};
let Ok(object_url) = Url::parse(&object_url_str) else {
tracing::warn!(actor = %actor_url, "Delete has unparseable object, ignoring");
return Ok(());
};
if object_url == *self.actor.inner() {
data.object_handler
.on_actor_removed(&actor_url)
.await
.map_err(|e| Error::from(anyhow::anyhow!(e)))?;
tracing::info!(actor = %actor_url, "received Delete(actor) — remote account deleted");
return Ok(());
}
data.object_handler
.on_delete(&object_url, &actor_url)
.await
.map_err(|e| Error::from(anyhow::anyhow!(e)))?;
tracing::info!(object = %object_url, "received Delete(note)");
Ok(())
}
}

85
src/activities/follow.rs Normal file
View File

@@ -0,0 +1,85 @@
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
kinds::activity::FollowType,
traits::Activity,
};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::actors::DbActor;
use crate::data::FederationData;
use crate::error::Error;
use crate::repository::FollowerStatus;
use super::helpers::check_guards;
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FollowActivity {
pub(crate) id: Url,
#[serde(rename = "type", default)]
pub(crate) kind: FollowType,
pub(crate) actor: ObjectId<DbActor>,
pub(crate) object: ObjectId<DbActor>,
}
#[async_trait::async_trait]
impl Activity for FollowActivity {
type DataType = FederationData;
type Error = Error;
fn id(&self) -> &Url { &self.id }
fn actor(&self) -> &Url { self.actor.inner() }
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let target_url = self.object.inner();
let target_domain = match (target_url.host_str(), target_url.port()) {
(Some(host), Some(port)) => format!("{}:{}", host, port),
(Some(host), None) => host.to_string(),
_ => return Err(Error::bad_request(anyhow::anyhow!("invalid follow target URL"))),
};
if target_domain == data.domain {
return Ok(());
}
if let Some(uuid) = crate::urls::extract_user_id_from_url(target_url) {
if data.user_repo.find_by_id(uuid).await.ok().flatten().is_some() {
tracing::debug!(target = %target_url, "accepting follow for migrated actor URL");
return Ok(());
}
}
Err(Error::bad_request(anyhow::anyhow!("follow target is not a local actor")))
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if check_guards(&self.id, self.actor.inner(), data).await? {
return Ok(());
}
// Actor block checked BEFORE any outbound HTTP fetch.
if let Some(target_user_id) = crate::urls::extract_user_id_from_url(self.object.inner()) {
if data.federation_repo
.is_actor_blocked(target_user_id, self.actor.inner().as_str())
.await?
{
tracing::info!(actor = %self.actor.inner(), "ignoring follow from blocked actor");
return Ok(());
}
}
let _follower = self.actor.dereference(data).await?;
let local_actor = self.object.dereference(data).await?;
data.federation_repo
.add_follower(
local_actor.user_id,
self.actor.inner().as_str(),
FollowerStatus::Pending,
self.id.as_str(),
)
.await?;
tracing::info!(
follower = %self.actor.inner(),
local_user = %local_actor.user_id,
"follow request pending approval"
);
Ok(())
}
}

84
src/activities/helpers.rs Normal file
View File

@@ -0,0 +1,84 @@
use activitypub_federation::config::Data;
use url::Url;
use crate::data::FederationData;
use crate::error::Error;
/// Returns `true` if the activity was already processed.
/// Marks it processed before returning `false`.
/// On repo error, skips the check rather than silently dropping the activity.
pub(crate) async fn already_processed(activity_id: &Url, data: &Data<FederationData>) -> bool {
let id = activity_id.as_str();
match data.federation_repo.is_activity_processed(id).await {
Ok(true) => {
tracing::debug!(activity_id = id, "duplicate activity, skipping");
true
}
Ok(false) => {
if let Err(e) = data.federation_repo.mark_activity_processed(id).await {
tracing::warn!(activity_id = id, error = %e, "failed to mark activity processed");
}
false
}
Err(e) => {
tracing::warn!(error = %e, "idempotency check failed, processing anyway");
false
}
}
}
/// Returns `true` when the activity should be skipped:
/// already processed, or the sender's domain is blocked.
/// Call this at the top of every `receive()` impl.
pub(crate) async fn check_guards(
id: &Url,
actor: &Url,
data: &Data<FederationData>,
) -> Result<bool, Error> {
if already_processed(id, data).await {
return Ok(true);
}
let domain = actor.host_str().unwrap_or("");
if data.federation_repo.is_domain_blocked(domain).await? {
tracing::info!(actor = %actor, "ignoring activity from blocked domain");
return Ok(true);
}
Ok(false)
}
/// Parse `object["tag"]` for `Mention` entries and notify each tagged local user.
/// Failures are logged and never propagated — a broken mention must not fail the activity.
pub(crate) async fn extract_and_dispatch_mentions(
ap_id: &Url,
actor_url: &Url,
object: &serde_json::Value,
data: &Data<FederationData>,
) {
let Some(tags) = object.get("tag").and_then(|t| t.as_array()) else {
return;
};
for tag in tags {
if tag.get("type").and_then(|v| v.as_str()) != Some("Mention") {
continue;
}
let Some(href) = tag.get("href").and_then(|v| v.as_str()) else {
continue;
};
let Ok(href_url) = Url::parse(href) else { continue };
let Some(mentioned_user_id) = crate::urls::extract_user_id_from_url(&href_url) else {
continue;
};
if let Err(e) = data
.object_handler
.on_mention(ap_id, mentioned_user_id, actor_url)
.await
{
tracing::warn!(
ap_id = %ap_id,
mentioned_user = %mentioned_user_id,
error = %e,
"failed to dispatch mention notification"
);
}
}
}

61
src/activities/like.rs Normal file
View File

@@ -0,0 +1,61 @@
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
protocol::verification::verify_domains_match,
traits::Activity,
};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::actors::DbActor;
use crate::data::FederationData;
use crate::error::Error;
use super::helpers::check_guards;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename = "Like")]
pub struct LikeType;
impl Default for LikeType {
fn default() -> Self { Self }
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct LikeActivity {
pub id: Url,
#[serde(rename = "type")]
pub kind: LikeType,
pub actor: ObjectId<DbActor>,
pub object: Url,
}
#[async_trait::async_trait]
impl Activity for LikeActivity {
type DataType = FederationData;
type Error = Error;
fn id(&self) -> &Url { &self.id }
fn actor(&self) -> &Url { self.actor.inner() }
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
verify_domains_match(&self.id, self.actor.inner())?;
Ok(())
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if check_guards(&self.id, self.actor.inner(), data).await? {
return Ok(());
}
if self.object.host_str().unwrap_or("") != data.domain {
return Ok(());
}
data.object_handler
.on_like(&self.object, self.actor.inner())
.await
.map_err(|e| Error::from(anyhow::anyhow!(e)))?;
tracing::info!(actor = %self.actor.inner(), object = %self.object, "received like");
Ok(())
}
}

60
src/activities/mod.rs Normal file
View File

@@ -0,0 +1,60 @@
mod accept;
mod add;
mod announce;
mod block;
mod create;
mod delete;
mod follow;
pub(crate) mod helpers;
mod like;
mod move_act;
mod reject;
mod undo;
mod update;
pub use accept::AcceptActivity;
pub use add::{AddActivity, AddType};
pub use announce::{AnnounceActivity, AnnounceType};
pub use block::{BlockActivity, BlockType};
pub use create::CreateActivity;
pub use delete::DeleteActivity;
pub use follow::FollowActivity;
pub use like::{LikeActivity, LikeType};
pub use move_act::{MoveActivity, MoveType};
pub use reject::RejectActivity;
pub use undo::UndoActivity;
pub use update::UpdateActivity;
use activitypub_federation::config::Data;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "type")]
#[enum_delegate::implement(activitypub_federation::traits::Activity)]
pub enum InboxActivities {
#[serde(rename = "Follow")]
Follow(FollowActivity),
#[serde(rename = "Accept")]
Accept(AcceptActivity),
#[serde(rename = "Reject")]
Reject(RejectActivity),
#[serde(rename = "Undo")]
Undo(UndoActivity),
#[serde(rename = "Create")]
Create(CreateActivity),
#[serde(rename = "Delete")]
Delete(DeleteActivity),
#[serde(rename = "Update")]
Update(UpdateActivity),
#[serde(rename = "Announce")]
Announce(AnnounceActivity),
#[serde(rename = "Add")]
Add(AddActivity),
#[serde(rename = "Block")]
Block(BlockActivity),
#[serde(rename = "Like")]
Like(LikeActivity),
#[serde(rename = "Move")]
Move(MoveActivity),
}

105
src/activities/move_act.rs Normal file
View File

@@ -0,0 +1,105 @@
use activitypub_federation::{
activity_sending::SendActivityTask,
config::Data,
fetch::object_id::ObjectId,
protocol::context::WithContext,
traits::Activity,
};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::actors::DbActor;
use crate::data::FederationData;
use crate::error::Error;
use super::follow::FollowActivity;
use super::helpers::check_guards;
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
#[serde(rename = "Move")]
pub struct MoveType;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct MoveActivity {
pub(crate) id: Url,
#[serde(rename = "type", default)]
pub(crate) kind: MoveType,
pub(crate) actor: ObjectId<DbActor>,
pub(crate) object: Url,
pub(crate) target: Url,
}
#[async_trait::async_trait]
impl Activity for MoveActivity {
type DataType = FederationData;
type Error = Error;
fn id(&self) -> &Url { &self.id }
fn actor(&self) -> &Url { self.actor.inner() }
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if &self.object != self.actor.inner() {
return Err(Error::bad_request(anyhow::anyhow!("Move object must be the actor itself")));
}
Ok(())
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if check_guards(&self.id, self.actor.inner(), data).await? {
return Ok(());
}
let target = ObjectId::<DbActor>::from(self.target.clone())
.dereference(data)
.await
.map_err(|e| Error::from(anyhow::anyhow!("{e}")))?;
if target.also_known_as.as_deref() != Some(self.object.as_str()) {
return Err(Error::bad_request(anyhow::anyhow!(
"Move target alsoKnownAs does not reference old actor"
)));
}
let affected = data
.federation_repo
.migrate_follower_actor(self.object.as_str(), self.target.as_str())
.await
.map_err(|e| Error::from(anyhow::anyhow!("{e}")))?;
let affected_count = affected.len();
for local_user_id in &affected {
let local_actor = match crate::actors::get_local_actor(*local_user_id, data).await {
Ok(a) => a,
Err(e) => { tracing::warn!(error = %e, %local_user_id, "Move: failed to load local actor"); continue; }
};
let follow_id = match crate::urls::activity_url(&data.base_url) {
Ok(u) => u,
Err(e) => { tracing::warn!(error = %e, "Move: failed to generate follow activity URL"); continue; }
};
let follow = FollowActivity {
id: follow_id,
kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()),
object: ObjectId::from(self.target.clone()),
};
let sends = match SendActivityTask::prepare(
&WithContext::new_default(follow),
&local_actor,
vec![target.inbox_url.clone()],
data,
).await {
Ok(s) => s,
Err(e) => { tracing::warn!(error = %e, "Move: failed to prepare re-follow"); continue; }
};
for send in sends {
if let Err(e) = send.sign_and_send(data).await {
tracing::warn!(error = %e, %local_user_id, "Move: re-follow delivery failed");
}
}
}
tracing::info!(
actor = %self.actor.inner(),
target = %self.target,
affected = affected_count,
"received Move — migrated follower relationships"
);
Ok(())
}
}

54
src/activities/reject.rs Normal file
View File

@@ -0,0 +1,54 @@
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
kinds::activity::RejectType,
traits::Activity,
};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::actors::DbActor;
use crate::data::FederationData;
use crate::error::Error;
use super::follow::FollowActivity;
use super::helpers::check_guards;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RejectActivity {
pub(crate) id: Url,
#[serde(rename = "type", default)]
pub(crate) kind: RejectType,
pub(crate) actor: ObjectId<DbActor>,
pub(crate) object: FollowActivity,
}
#[async_trait::async_trait]
impl Activity for RejectActivity {
type DataType = FederationData;
type Error = Error;
fn id(&self) -> &Url { &self.id }
fn actor(&self) -> &Url { self.actor.inner() }
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if self.actor.inner() != self.object.object.inner() {
return Err(Error::bad_request(anyhow::anyhow!("Reject actor does not match Follow target")));
}
Ok(())
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if check_guards(&self.id, self.actor.inner(), data).await? {
return Ok(());
}
if let Some(user_id) = crate::urls::extract_user_id_from_url(self.object.actor.inner()) {
data.federation_repo
.remove_following(user_id, self.actor.inner().as_str())
.await?;
}
tracing::info!(actor = %self.actor.inner(), "follow rejected");
Ok(())
}
}

101
src/activities/undo.rs Normal file
View File

@@ -0,0 +1,101 @@
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
kinds::activity::UndoType,
traits::Activity,
};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::actors::DbActor;
use crate::data::FederationData;
use crate::error::Error;
use super::helpers::check_guards;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct UndoActivity {
pub(crate) id: Url,
#[serde(rename = "type", default)]
pub(crate) kind: UndoType,
pub(crate) actor: ObjectId<DbActor>,
pub(crate) object: serde_json::Value,
}
#[async_trait::async_trait]
impl Activity for UndoActivity {
type DataType = FederationData;
type Error = Error;
fn id(&self) -> &Url { &self.id }
fn actor(&self) -> &Url { self.actor.inner() }
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if let Some(inner_actor) = self.object.get("actor").and_then(|v| v.as_str())
&& inner_actor != self.actor.inner().as_str()
{
return Err(Error::bad_request(anyhow::anyhow!(
"Undo actor does not match inner activity actor"
)));
}
Ok(())
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if check_guards(&self.id, self.actor.inner(), data).await? {
return Ok(());
}
let obj_type = self.object.get("type").and_then(|t| t.as_str()).unwrap_or("");
match obj_type {
"Follow" => {
if let Some(obj_url) = self.object.get("object").and_then(|o| o.as_str())
&& let Ok(url) = Url::parse(obj_url)
&& let Some(user_id) = crate::urls::extract_user_id_from_url(&url)
{
data.federation_repo
.remove_follower(user_id, self.actor.inner().as_str())
.await?;
}
data.object_handler
.on_actor_removed(self.actor.inner())
.await
.map_err(|e| Error::from(anyhow::anyhow!(e)))?;
tracing::info!(actor = %self.actor.inner(), "unfollowed");
}
"Add" => {
let ap_id_str = self
.object
.get("object")
.and_then(|o| o.get("id"))
.and_then(|id| id.as_str())
.or_else(|| self.object.get("id").and_then(|id| id.as_str()));
if let Some(ap_id_str) = ap_id_str
&& let Ok(ap_id) = Url::parse(ap_id_str)
{
data.object_handler
.on_delete(&ap_id, self.actor.inner())
.await
.map_err(|e| Error::from(anyhow::anyhow!(e)))?;
tracing::info!(ap_id = %ap_id_str, "undo Add (watchlist remove)");
}
}
"Like" => {
if let Some(obj_url_str) = self.object.get("object").and_then(|o| o.as_str())
&& let Ok(obj_url) = Url::parse(obj_url_str)
&& obj_url.host_str().unwrap_or("") == data.domain
{
data.object_handler
.on_unlike(&obj_url, self.actor.inner())
.await
.unwrap_or_else(|e| tracing::warn!(error = %e, "failed to process unlike"));
}
tracing::info!(actor = %self.actor.inner(), "received Undo(Like)");
}
other => {
tracing::debug!(kind = %other, "ignoring Undo of unknown activity type");
}
}
Ok(())
}
}

69
src/activities/update.rs Normal file
View File

@@ -0,0 +1,69 @@
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
kinds::activity::UpdateType,
traits::Activity,
};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::actors::DbActor;
use crate::data::FederationData;
use crate::error::Error;
use super::helpers::{check_guards, extract_and_dispatch_mentions};
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct UpdateActivity {
pub(crate) id: Url,
#[serde(rename = "type", default)]
pub(crate) kind: UpdateType,
pub(crate) actor: ObjectId<DbActor>,
pub(crate) object: serde_json::Value,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) to: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) cc: Vec<String>,
}
#[async_trait::async_trait]
impl Activity for UpdateActivity {
type DataType = FederationData;
type Error = Error;
fn id(&self) -> &Url { &self.id }
fn actor(&self) -> &Url { self.actor.inner() }
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str())
&& let Ok(attributed_url) = Url::parse(attributed_to)
&& &attributed_url != self.actor.inner()
{
return Err(Error::bad_request(anyhow::anyhow!(
"Update actor does not match object attributedTo"
)));
}
Ok(())
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if check_guards(&self.id, self.actor.inner(), data).await? {
return Ok(());
}
let ap_id = self
.object
.get("id")
.and_then(|v| v.as_str())
.and_then(|s| Url::parse(s).ok())
.unwrap_or_else(|| self.id.clone());
let actor_url = self.actor.inner().clone();
extract_and_dispatch_mentions(&ap_id, &actor_url, &self.object, data).await;
data.object_handler
.on_update(&ap_id, &actor_url, self.object)
.await
.map_err(|e| Error::from(anyhow::anyhow!(e)))?;
tracing::info!(actor = %actor_url, "received update activity");
Ok(())
}
}