style: clippy fixes and linter formatting

This commit is contained in:
2026-05-29 01:52:14 +02:00
parent df6ff4c1e8
commit 73a68860c1
27 changed files with 1090 additions and 440 deletions

View File

@@ -1,8 +1,5 @@
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data, fetch::object_id::ObjectId, kinds::activity::AcceptType, traits::Activity,
fetch::object_id::ObjectId,
kinds::activity::AcceptType,
traits::Activity,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@@ -30,12 +27,18 @@ impl Activity for AcceptActivity {
type DataType = FederationData; type DataType = FederationData;
type Error = Error; type Error = Error;
fn id(&self) -> &Url { &self.id } fn id(&self) -> &Url {
fn actor(&self) -> &Url { self.actor.inner() } &self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if self.actor.inner() != self.object.object.inner() { if self.actor.inner() != self.object.object.inner() {
return Err(Error::bad_request(anyhow::anyhow!("Accept actor does not match Follow target"))); return Err(Error::bad_request(anyhow::anyhow!(
"Accept actor does not match Follow target"
)));
} }
Ok(()) Ok(())
} }

View File

@@ -1,8 +1,4 @@
use activitypub_federation::{ use activitypub_federation::{config::Data, fetch::object_id::ObjectId, traits::Activity};
config::Data,
fetch::object_id::ObjectId,
traits::Activity,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@@ -35,8 +31,12 @@ impl Activity for AddActivity {
type DataType = FederationData; type DataType = FederationData;
type Error = Error; type Error = Error;
fn id(&self) -> &Url { &self.id } fn id(&self) -> &Url {
fn actor(&self) -> &Url { self.actor.inner() } &self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str())

View File

@@ -1,7 +1,5 @@
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data, fetch::object_id::ObjectId, protocol::verification::verify_domains_match,
fetch::object_id::ObjectId,
protocol::verification::verify_domains_match,
traits::Activity, traits::Activity,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -37,8 +35,12 @@ impl Activity for AnnounceActivity {
type DataType = FederationData; type DataType = FederationData;
type Error = Error; type Error = Error;
fn id(&self) -> &Url { &self.id } fn id(&self) -> &Url {
fn actor(&self) -> &Url { self.actor.inner() } &self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
verify_domains_match(&self.id, self.actor.inner())?; verify_domains_match(&self.id, self.actor.inner())?;
@@ -53,7 +55,9 @@ impl Activity for AnnounceActivity {
data.object_handler data.object_handler
.on_announce_of_remote(&self.object, self.actor.inner()) .on_announce_of_remote(&self.object, self.actor.inner())
.await .await
.unwrap_or_else(|e| tracing::warn!(error = %e, "failed to process cross-server announce")); .unwrap_or_else(
|e| tracing::warn!(error = %e, "failed to process cross-server announce"),
);
tracing::debug!(actor = %self.actor.inner(), object = %self.object, "received Announce of non-local object"); tracing::debug!(actor = %self.actor.inner(), object = %self.object, "received Announce of non-local object");
return Ok(()); return Ok(());
} }
@@ -68,7 +72,9 @@ impl Activity for AnnounceActivity {
data.object_handler data.object_handler
.on_announce_received(&self.object, self.actor.inner()) .on_announce_received(&self.object, self.actor.inner())
.await .await
.unwrap_or_else(|e| tracing::warn!(error = %e, "failed to process announce notification")); .unwrap_or_else(
|e| tracing::warn!(error = %e, "failed to process announce notification"),
);
tracing::info!(actor = %self.actor.inner(), object = %self.object, "received announce"); tracing::info!(actor = %self.actor.inner(), object = %self.object, "received announce");
Ok(()) Ok(())
} }

View File

@@ -1,7 +1,5 @@
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data, fetch::object_id::ObjectId, protocol::verification::verify_domains_match,
fetch::object_id::ObjectId,
protocol::verification::verify_domains_match,
traits::Activity, traits::Activity,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -32,8 +30,12 @@ impl Activity for BlockActivity {
type DataType = FederationData; type DataType = FederationData;
type Error = Error; type Error = Error;
fn id(&self) -> &Url { &self.id } fn id(&self) -> &Url {
fn actor(&self) -> &Url { self.actor.inner() } &self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
verify_domains_match(&self.id, self.actor.inner())?; verify_domains_match(&self.id, self.actor.inner())?;
@@ -45,8 +47,14 @@ impl Activity for BlockActivity {
return Ok(()); return Ok(());
} }
if let Some(local_user_id) = crate::urls::extract_user_id_from_url(&self.object) { if let Some(local_user_id) = crate::urls::extract_user_id_from_url(&self.object) {
let _ = data.follow_repo.remove_following(local_user_id, self.actor.inner().as_str()).await; let _ = data
let _ = data.follow_repo.remove_follower(local_user_id, self.actor.inner().as_str()).await; .follow_repo
.remove_following(local_user_id, self.actor.inner().as_str())
.await;
let _ = data
.follow_repo
.remove_follower(local_user_id, self.actor.inner().as_str())
.await;
} }
tracing::info!(actor = %self.actor.inner(), "received block — removed following and follower"); tracing::info!(actor = %self.actor.inner(), "received block — removed following and follower");
Ok(()) Ok(())

View File

@@ -1,8 +1,5 @@
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data, fetch::object_id::ObjectId, kinds::activity::CreateType, traits::Activity,
fetch::object_id::ObjectId,
kinds::activity::CreateType,
traits::Activity,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@@ -36,8 +33,12 @@ impl Activity for CreateActivity {
type DataType = FederationData; type DataType = FederationData;
type Error = Error; type Error = Error;
fn id(&self) -> &Url { &self.id } fn id(&self) -> &Url {
fn actor(&self) -> &Url { self.actor.inner() } &self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str())

View File

@@ -1,8 +1,5 @@
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data, fetch::object_id::ObjectId, kinds::activity::DeleteType, traits::Activity,
fetch::object_id::ObjectId,
kinds::activity::DeleteType,
traits::Activity,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@@ -32,8 +29,12 @@ impl Activity for DeleteActivity {
type DataType = FederationData; type DataType = FederationData;
type Error = Error; type Error = Error;
fn id(&self) -> &Url { &self.id } fn id(&self) -> &Url {
fn actor(&self) -> &Url { self.actor.inner() } &self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let actor_domain = self.actor.inner().host_str().unwrap_or(""); let actor_domain = self.actor.inner().host_str().unwrap_or("");

View File

@@ -1,8 +1,5 @@
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data, fetch::object_id::ObjectId, kinds::activity::FollowType, traits::Activity,
fetch::object_id::ObjectId,
kinds::activity::FollowType,
traits::Activity,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@@ -29,26 +26,42 @@ impl Activity for FollowActivity {
type DataType = FederationData; type DataType = FederationData;
type Error = Error; type Error = Error;
fn id(&self) -> &Url { &self.id } fn id(&self) -> &Url {
fn actor(&self) -> &Url { self.actor.inner() } &self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let target_url = self.object.inner(); let target_url = self.object.inner();
let target_domain = match (target_url.host_str(), target_url.port()) { let target_domain = match (target_url.host_str(), target_url.port()) {
(Some(host), Some(port)) => format!("{}:{}", host, port), (Some(host), Some(port)) => format!("{}:{}", host, port),
(Some(host), None) => host.to_string(), (Some(host), None) => host.to_string(),
_ => return Err(Error::bad_request(anyhow::anyhow!("invalid follow target URL"))), _ => {
return Err(Error::bad_request(anyhow::anyhow!(
"invalid follow target URL"
)));
}
}; };
if target_domain == data.domain { if target_domain == data.domain {
return Ok(()); return Ok(());
} }
if let Some(uuid) = crate::urls::extract_user_id_from_url(target_url) { if let Some(uuid) = crate::urls::extract_user_id_from_url(target_url)
if data.user_repo.find_by_id(uuid).await.ok().flatten().is_some() { && data
.user_repo
.find_by_id(uuid)
.await
.ok()
.flatten()
.is_some()
{
tracing::debug!(target = %target_url, "accepting follow for migrated actor URL"); tracing::debug!(target = %target_url, "accepting follow for migrated actor URL");
return Ok(()); return Ok(());
} }
} Err(Error::bad_request(anyhow::anyhow!(
Err(Error::bad_request(anyhow::anyhow!("follow target is not a local actor"))) "follow target is not a local actor"
)))
} }
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
@@ -56,15 +69,15 @@ impl Activity for FollowActivity {
return Ok(()); return Ok(());
} }
// Actor block checked BEFORE any outbound HTTP fetch. // Actor block checked BEFORE any outbound HTTP fetch.
if let Some(target_user_id) = crate::urls::extract_user_id_from_url(self.object.inner()) { if let Some(target_user_id) = crate::urls::extract_user_id_from_url(self.object.inner())
if data.blocklist_repo && data
.blocklist_repo
.is_actor_blocked(target_user_id, self.actor.inner().as_str()) .is_actor_blocked(target_user_id, self.actor.inner().as_str())
.await? .await?
{ {
tracing::info!(actor = %self.actor.inner(), "ignoring follow from blocked actor"); tracing::info!(actor = %self.actor.inner(), "ignoring follow from blocked actor");
return Ok(()); return Ok(());
} }
}
let _follower = self.actor.dereference(data).await?; let _follower = self.actor.dereference(data).await?;
let local_actor = self.object.dereference(data).await?; let local_actor = self.object.dereference(data).await?;
data.follow_repo data.follow_repo

View File

@@ -64,7 +64,9 @@ pub(crate) async fn extract_and_dispatch_mentions(
let Some(href) = tag.get("href").and_then(|v| v.as_str()) else { let Some(href) = tag.get("href").and_then(|v| v.as_str()) else {
continue; continue;
}; };
let Ok(href_url) = Url::parse(href) else { continue }; let Ok(href_url) = Url::parse(href) else {
continue;
};
let Some(mentioned_user_id) = crate::urls::extract_user_id_from_url(&href_url) else { let Some(mentioned_user_id) = crate::urls::extract_user_id_from_url(&href_url) else {
continue; continue;
}; };

View File

@@ -1,7 +1,5 @@
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data, fetch::object_id::ObjectId, protocol::verification::verify_domains_match,
fetch::object_id::ObjectId,
protocol::verification::verify_domains_match,
traits::Activity, traits::Activity,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -18,7 +16,9 @@ use super::helpers::check_guards;
pub struct LikeType; pub struct LikeType;
impl Default for LikeType { impl Default for LikeType {
fn default() -> Self { Self } fn default() -> Self {
Self
}
} }
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
@@ -36,8 +36,12 @@ impl Activity for LikeActivity {
type DataType = FederationData; type DataType = FederationData;
type Error = Error; type Error = Error;
fn id(&self) -> &Url { &self.id } fn id(&self) -> &Url {
fn actor(&self) -> &Url { self.actor.inner() } &self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
verify_domains_match(&self.id, self.actor.inner())?; verify_domains_match(&self.id, self.actor.inner())?;

View File

@@ -1,9 +1,6 @@
use activitypub_federation::{ use activitypub_federation::{
activity_sending::SendActivityTask, activity_sending::SendActivityTask, config::Data, fetch::object_id::ObjectId,
config::Data, protocol::context::WithContext, traits::Activity,
fetch::object_id::ObjectId,
protocol::context::WithContext,
traits::Activity,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@@ -35,12 +32,18 @@ impl Activity for MoveActivity {
type DataType = FederationData; type DataType = FederationData;
type Error = Error; type Error = Error;
fn id(&self) -> &Url { &self.id } fn id(&self) -> &Url {
fn actor(&self) -> &Url { self.actor.inner() } &self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if &self.object != self.actor.inner() { if &self.object != self.actor.inner() {
return Err(Error::bad_request(anyhow::anyhow!("Move object must be the actor itself"))); return Err(Error::bad_request(anyhow::anyhow!(
"Move object must be the actor itself"
)));
} }
Ok(()) Ok(())
} }
@@ -67,11 +70,17 @@ impl Activity for MoveActivity {
for local_user_id in &affected { for local_user_id in &affected {
let local_actor = match crate::actors::get_local_actor(*local_user_id, data).await { let local_actor = match crate::actors::get_local_actor(*local_user_id, data).await {
Ok(a) => a, Ok(a) => a,
Err(e) => { tracing::warn!(error = %e, %local_user_id, "Move: failed to load local actor"); continue; } Err(e) => {
tracing::warn!(error = %e, %local_user_id, "Move: failed to load local actor");
continue;
}
}; };
let follow_id = match crate::urls::activity_url(&data.base_url) { let follow_id = match crate::urls::activity_url(&data.base_url) {
Ok(u) => u, Ok(u) => u,
Err(e) => { tracing::warn!(error = %e, "Move: failed to generate follow activity URL"); continue; } Err(e) => {
tracing::warn!(error = %e, "Move: failed to generate follow activity URL");
continue;
}
}; };
let follow = FollowActivity { let follow = FollowActivity {
id: follow_id, id: follow_id,
@@ -84,9 +93,14 @@ impl Activity for MoveActivity {
&local_actor, &local_actor,
vec![target.inbox_url.clone()], vec![target.inbox_url.clone()],
data, data,
).await { )
.await
{
Ok(s) => s, Ok(s) => s,
Err(e) => { tracing::warn!(error = %e, "Move: failed to prepare re-follow"); continue; } Err(e) => {
tracing::warn!(error = %e, "Move: failed to prepare re-follow");
continue;
}
}; };
for send in sends { for send in sends {
if let Err(e) = send.sign_and_send(data).await { if let Err(e) = send.sign_and_send(data).await {

View File

@@ -1,8 +1,5 @@
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data, fetch::object_id::ObjectId, kinds::activity::RejectType, traits::Activity,
fetch::object_id::ObjectId,
kinds::activity::RejectType,
traits::Activity,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@@ -29,12 +26,18 @@ impl Activity for RejectActivity {
type DataType = FederationData; type DataType = FederationData;
type Error = Error; type Error = Error;
fn id(&self) -> &Url { &self.id } fn id(&self) -> &Url {
fn actor(&self) -> &Url { self.actor.inner() } &self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if self.actor.inner() != self.object.object.inner() { if self.actor.inner() != self.object.object.inner() {
return Err(Error::bad_request(anyhow::anyhow!("Reject actor does not match Follow target"))); return Err(Error::bad_request(anyhow::anyhow!(
"Reject actor does not match Follow target"
)));
} }
Ok(()) Ok(())
} }

View File

@@ -1,8 +1,5 @@
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data, fetch::object_id::ObjectId, kinds::activity::UndoType, traits::Activity,
fetch::object_id::ObjectId,
kinds::activity::UndoType,
traits::Activity,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@@ -28,8 +25,12 @@ impl Activity for UndoActivity {
type DataType = FederationData; type DataType = FederationData;
type Error = Error; type Error = Error;
fn id(&self) -> &Url { &self.id } fn id(&self) -> &Url {
fn actor(&self) -> &Url { self.actor.inner() } &self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if let Some(inner_actor) = self.object.get("actor").and_then(|v| v.as_str()) if let Some(inner_actor) = self.object.get("actor").and_then(|v| v.as_str())
@@ -46,7 +47,11 @@ impl Activity for UndoActivity {
if check_guards(&self.id, self.actor.inner(), data).await? { if check_guards(&self.id, self.actor.inner(), data).await? {
return Ok(()); return Ok(());
} }
let obj_type = self.object.get("type").and_then(|t| t.as_str()).unwrap_or(""); let obj_type = self
.object
.get("type")
.and_then(|t| t.as_str())
.unwrap_or("");
match obj_type { match obj_type {
"Follow" => { "Follow" => {
if let Some(obj_url) = self.object.get("object").and_then(|o| o.as_str()) if let Some(obj_url) = self.object.get("object").and_then(|o| o.as_str())

View File

@@ -1,8 +1,5 @@
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data, fetch::object_id::ObjectId, kinds::activity::UpdateType, traits::Activity,
fetch::object_id::ObjectId,
kinds::activity::UpdateType,
traits::Activity,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@@ -32,8 +29,12 @@ impl Activity for UpdateActivity {
type DataType = FederationData; type DataType = FederationData;
type Error = Error; type Error = Error;
fn id(&self) -> &Url { &self.id } fn id(&self) -> &Url {
fn actor(&self) -> &Url { self.actor.inner() } &self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str())

View File

@@ -140,11 +140,7 @@ pub async fn get_local_actor(
.map_err(Error::from)? .map_err(Error::from)?
.ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found: {}", user_id)))?; .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found: {}", user_id)))?;
let (public_key, private_key) = match data let (public_key, private_key) = match data.actor_repo.get_local_actor_keypair(user_id).await? {
.actor_repo
.get_local_actor_keypair(user_id)
.await?
{
Some(kp) => kp, Some(kp) => kp,
None => { None => {
let kp = generate_actor_keypair()?; let kp = generate_actor_keypair()?;
@@ -230,10 +226,7 @@ impl Object for DbActor {
_ => return Ok(None), _ => return Ok(None),
}; };
let keypair = data let keypair = data.actor_repo.get_local_actor_keypair(user_id).await?;
.actor_repo
.get_local_actor_keypair(user_id)
.await?;
let (public_key, private_key) = match keypair { let (public_key, private_key) = match keypair {
Some(kp) => (kp.0, Some(kp.1)), Some(kp) => (kp.0, Some(kp.1)),
@@ -377,8 +370,14 @@ impl Object for DbActor {
Url::parse(&format!("{}{}", ap_id, suffix)).unwrap_or_else(|_| ap_id.clone()) Url::parse(&format!("{}{}", ap_id, suffix)).unwrap_or_else(|_| ap_id.clone())
}; };
let outbox_url = json.outbox.clone().unwrap_or_else(|| fallback("/outbox")); let outbox_url = json.outbox.clone().unwrap_or_else(|| fallback("/outbox"));
let followers_url = json.followers.clone().unwrap_or_else(|| fallback("/followers")); let followers_url = json
let following_url = json.following.clone().unwrap_or_else(|| fallback("/following")); .followers
.clone()
.unwrap_or_else(|| fallback("/followers"));
let following_url = json
.following
.clone()
.unwrap_or_else(|| fallback("/following"));
Ok(DbActor { Ok(DbActor {
user_id, user_id,

View File

@@ -51,17 +51,9 @@ pub trait ApObjectHandler: Send + Sync {
async fn on_unlike(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; async fn on_unlike(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
async fn on_announce_received( async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
&self,
object_url: &Url,
actor_url: &Url,
) -> anyhow::Result<()>;
async fn on_announce_of_remote( async fn on_announce_of_remote(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
&self,
object_url: &Url,
actor_url: &Url,
) -> anyhow::Result<()>;
async fn on_mention( async fn on_mention(
&self, &self,

View File

@@ -1,7 +1,9 @@
use std::sync::Arc; use std::sync::Arc;
use crate::content::{ApContentReader, ApObjectHandler}; use crate::content::{ApContentReader, ApObjectHandler};
use crate::repository::{ActivityRepository, ActorRepository, BlocklistRepository, FollowRepository}; use crate::repository::{
ActivityRepository, ActorRepository, BlocklistRepository, FollowRepository,
};
use crate::user::ApUserRepository; use crate::user::ApUserRepository;
/// Typed event emitted by the federation layer. /// Typed event emitted by the federation layer.

View File

@@ -15,18 +15,20 @@ pub(crate) mod urls;
pub mod user; pub mod user;
pub mod webfinger; pub mod webfinger;
pub use urls::AS_PUBLIC;
pub use activitypub_federation::kinds::object::NoteType; pub use activitypub_federation::kinds::object::NoteType;
pub use content::{ApContentReader, ApObjectHandler}; pub use content::{ApContentReader, ApObjectHandler};
pub use data::{EventPublisher, FederationData, FederationEvent}; pub use data::{EventPublisher, FederationData, FederationEvent};
pub use error::Error; pub use error::Error;
pub use federation::ApFederationConfig; pub use federation::ApFederationConfig;
pub use repository::{ pub use repository::{
ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, FollowRepository,
Follower, FollowerStatus, FollowingStatus, FollowRepository, RemoteActor, Follower, FollowerStatus, FollowingStatus, RemoteActor,
}; };
pub use service::ActivityPubService; pub use service::ActivityPubService;
pub use user::{ApActorType, ApProfileField, ApUser, ApUserRepository, ApVisibility, LookedUpActor}; pub use urls::AS_PUBLIC;
pub use user::{
ApActorType, ApProfileField, ApUser, ApUserRepository, ApVisibility, LookedUpActor,
};
#[cfg(test)] #[cfg(test)]
#[path = "tests/integration.rs"] #[path = "tests/integration.rs"]

View File

@@ -13,23 +13,8 @@ pub trait BlocklistRepository: Send + Sync {
async fn is_domain_blocked(&self, domain: &str) -> Result<bool>; async fn is_domain_blocked(&self, domain: &str) -> Result<bool>;
// ── Per-user actor blocklist ──────────────────────────────────────────── // ── Per-user actor blocklist ────────────────────────────────────────────
async fn add_blocked_actor( async fn add_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>;
&self, async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>;
local_user_id: uuid::Uuid, async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result<Vec<String>>;
actor_url: &str, async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<bool>;
) -> Result<()>;
async fn remove_blocked_actor(
&self,
local_user_id: uuid::Uuid,
actor_url: &str,
) -> Result<()>;
async fn get_blocked_actors(
&self,
local_user_id: uuid::Uuid,
) -> Result<Vec<String>>;
async fn is_actor_blocked(
&self,
local_user_id: uuid::Uuid,
actor_url: &str,
) -> Result<bool>;
} }

View File

@@ -38,16 +38,11 @@ pub trait FollowRepository: Send + Sync {
remote_actor_url: &str, remote_actor_url: &str,
status: FollowerStatus, status: FollowerStatus,
) -> Result<()>; ) -> Result<()>;
async fn get_pending_followers( async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result<Vec<RemoteActor>>;
&self,
local_user_id: uuid::Uuid,
) -> Result<Vec<RemoteActor>>;
/// Return deduplicated inbox URLs (shared_inbox preferred) for accepted /// Return deduplicated inbox URLs (shared_inbox preferred) for accepted
/// followers, excluding blocked actors/domains. DB-side filtering. /// followers, excluding blocked actors/domains. DB-side filtering.
async fn get_accepted_follower_inboxes( async fn get_accepted_follower_inboxes(&self, local_user_id: uuid::Uuid)
&self, -> Result<Vec<String>>;
local_user_id: uuid::Uuid,
) -> Result<Vec<String>>;
// ── Outbound following ────────────────────────────────────────────────── // ── Outbound following ──────────────────────────────────────────────────
async fn add_following( async fn add_following(
@@ -61,11 +56,7 @@ pub trait FollowRepository: Send + Sync {
local_user_id: uuid::Uuid, local_user_id: uuid::Uuid,
remote_actor_url: &str, remote_actor_url: &str,
) -> Result<Option<String>>; ) -> Result<Option<String>>;
async fn remove_following( async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>;
&self,
local_user_id: uuid::Uuid,
actor_url: &str,
) -> Result<()>;
async fn get_following(&self, local_user_id: uuid::Uuid) -> Result<Vec<RemoteActor>>; async fn get_following(&self, local_user_id: uuid::Uuid) -> Result<Vec<RemoteActor>>;
async fn get_following_page( async fn get_following_page(
&self, &self,

View File

@@ -1,25 +1,34 @@
use activitypub_federation::{activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext}; use activitypub_federation::{
activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext,
};
use url::Url; use url::Url;
use crate::{ use crate::{activities::CreateActivity, actors::get_local_actor, federation::ApFederationConfig};
activities::CreateActivity,
actors::get_local_actor,
federation::ApFederationConfig,
};
use super::{ActivityPubService, delivery::send_with_retry}; use super::{ActivityPubService, delivery::send_with_retry};
impl ActivityPubService { impl ActivityPubService {
pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> { pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> {
let client = reqwest::Client::builder() let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(super::HTTP_FETCH_TIMEOUT_SECS)) .timeout(std::time::Duration::from_secs(
super::HTTP_FETCH_TIMEOUT_SECS,
))
.build()?; .build()?;
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let actor = url::Url::parse(actor_url)?; let actor = url::Url::parse(actor_url)?;
let root: serde_json::Value = client.get(outbox_url).header("Accept", "application/activity+json").send().await?.json().await?; let root: serde_json::Value = client
.get(outbox_url)
.header("Accept", "application/activity+json")
.send()
.await?
.json()
.await?;
let first = match root.get("first").and_then(|v| v.as_str()) { let first = match root.get("first").and_then(|v| v.as_str()) {
Some(url) => url.to_string(), Some(url) => url.to_string(),
None => { tracing::debug!(outbox = %outbox_url, "outbox has no first page"); return Ok(()); } None => {
tracing::debug!(outbox = %outbox_url, "outbox has no first page");
return Ok(());
}
}; };
let mut current_url = first; let mut current_url = first;
let mut visited = std::collections::HashSet::new(); let mut visited = std::collections::HashSet::new();
@@ -28,16 +37,40 @@ impl ActivityPubService {
tracing::warn!(url = %current_url, "backfill: loop detected, stopping"); tracing::warn!(url = %current_url, "backfill: loop detected, stopping");
break; break;
} }
let page: serde_json::Value = match client.get(&current_url).header("Accept", "application/activity+json").send().await { let page: serde_json::Value = match client
Ok(resp) => match resp.json().await { Ok(v) => v, Err(e) => { tracing::error!(error = %e, "backfill: failed to parse page JSON"); break; } }, .get(&current_url)
Err(e) => { tracing::error!(error = %e, "backfill: HTTP request failed"); break; } .header("Accept", "application/activity+json")
.send()
.await
{
Ok(resp) => match resp.json().await {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "backfill: failed to parse page JSON");
break;
}
},
Err(e) => {
tracing::error!(error = %e, "backfill: HTTP request failed");
break;
}
}; };
if let Some(items) = page.get("orderedItems").and_then(|v| v.as_array()) { if let Some(items) = page.get("orderedItems").and_then(|v| v.as_array()) {
for item in items { for item in items {
let activity_type = item.get("type").and_then(|v| v.as_str()).unwrap_or(""); let activity_type = item.get("type").and_then(|v| v.as_str()).unwrap_or("");
if activity_type != "Create" && activity_type != "Add" { continue; } if activity_type != "Create" && activity_type != "Add" {
let Some(object) = item.get("object").filter(|o| o.is_object()).cloned() else { continue }; continue;
let Some(ap_id) = object.get("id").and_then(|v| v.as_str()).and_then(|s| url::Url::parse(s).ok()) else { continue }; }
let Some(object) = item.get("object").filter(|o| o.is_object()).cloned() else {
continue;
};
let Some(ap_id) = object
.get("id")
.and_then(|v| v.as_str())
.and_then(|s| url::Url::parse(s).ok())
else {
continue;
};
if let Err(e) = data.object_handler.on_create(&ap_id, &actor, object).await { if let Err(e) = data.object_handler.on_create(&ap_id, &actor, object).await {
tracing::warn!(ap_id = %ap_id, error = %e, "backfill: failed to process item"); tracing::warn!(ap_id = %ap_id, error = %e, "backfill: failed to process item");
} }
@@ -60,7 +93,16 @@ impl ActivityPubService {
let max_attempts = self.delivery_max_attempts; let max_attempts = self.delivery_max_attempts;
let initial_delay = self.delivery_initial_delay_secs; let initial_delay = self.delivery_initial_delay_secs;
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = ActivityPubService::run_backfill(config, base_url, owner_user_id, follower_inbox_url, max_attempts, initial_delay).await { if let Err(e) = ActivityPubService::run_backfill(
config,
base_url,
owner_user_id,
follower_inbox_url,
max_attempts,
initial_delay,
)
.await
{
tracing::warn!(error = %e, "backfill: task failed"); tracing::warn!(error = %e, "backfill: task failed");
} }
}); });
@@ -76,7 +118,9 @@ impl ActivityPubService {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
const BATCH_SIZE: usize = 20; const BATCH_SIZE: usize = 20;
let data = config.to_request_data(); let data = config.to_request_data();
let local_actor = get_local_actor(owner_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(owner_user_id, &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let inbox = Url::parse(&follower_inbox_url)?; let inbox = Url::parse(&follower_inbox_url)?;
// Cursor-based pagination via get_local_objects_page (newest-first). // Cursor-based pagination via get_local_objects_page (newest-first).
@@ -105,18 +149,27 @@ impl ActivityPubService {
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, ap_id.as_str().as_bytes()) uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, ap_id.as_str().as_bytes())
))?; ))?;
let create = CreateActivity { let create = CreateActivity {
id: create_id, kind: Default::default(), id: create_id,
kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()), actor: ObjectId::from(local_actor.ap_id.clone()),
object: object_json.clone(), to: vec![], cc: vec![], bto: vec![], bcc: vec![], object: object_json.clone(),
to: vec![],
cc: vec![],
bto: vec![],
bcc: vec![],
}; };
let sends = SendActivityTask::prepare( let sends = SendActivityTask::prepare(
&WithContext::new_default(create), &WithContext::new_default(create),
&local_actor, &local_actor,
vec![inbox.clone()], vec![inbox.clone()],
&data, &data,
).await?; )
.await?;
total += 1; total += 1;
if send_with_retry(sends, &data, max_attempts, initial_delay).await.is_empty() { if send_with_retry(sends, &data, max_attempts, initial_delay)
.await
.is_empty()
{
success_count += 1; success_count += 1;
} else { } else {
failure_count += 1; failure_count += 1;
@@ -127,7 +180,10 @@ impl ActivityPubService {
break; break;
} }
tokio::time::sleep(std::time::Duration::from_millis(super::BATCH_FETCH_SLEEP_MS)).await; tokio::time::sleep(std::time::Duration::from_millis(
super::BATCH_FETCH_SLEEP_MS,
))
.await;
} }
tracing::info!( tracing::info!(

View File

@@ -1,10 +1,12 @@
use activitypub_federation::{fetch::object_id::ObjectId, protocol::context::WithContext, traits::Object}; use activitypub_federation::{
fetch::object_id::ObjectId, protocol::context::WithContext, traits::Object,
};
use url::Url; use url::Url;
use crate::{ use crate::{
activities::{ activities::{
AddActivity, AnnounceActivity, CreateActivity, DeleteActivity, AddActivity, AnnounceActivity, CreateActivity, DeleteActivity, MoveActivity, UndoActivity,
MoveActivity, UndoActivity, UpdateActivity, UpdateActivity,
}, },
actors::get_local_actor, actors::get_local_actor,
urls::activity_url, urls::activity_url,
@@ -22,10 +24,18 @@ impl ActivityPubService {
let announce_id = url::Url::parse(&format!( let announce_id = url::Url::parse(&format!(
"{}/activities/announce/{}", "{}/activities/announce/{}",
self.base_url, self.base_url,
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, format!("{}/{}", local_user_id, object_ap_id).as_bytes()), uuid::Uuid::new_v5(
)).map_err(|e| anyhow::anyhow!("{e}"))?; &uuid::Uuid::NAMESPACE_URL,
format!("{}/{}", local_user_id, object_ap_id).as_bytes()
),
))
.map_err(|e| anyhow::anyhow!("{e}"))?;
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; let Some((local_actor, inboxes)) =
self.accepted_follower_inboxes(&data, local_user_id).await?
else {
return Ok(());
};
let announce = AnnounceActivity { let announce = AnnounceActivity {
id: announce_id, id: announce_id,
kind: Default::default(), kind: Default::default(),
@@ -35,8 +45,11 @@ impl ActivityPubService {
to: vec![crate::urls::AS_PUBLIC.to_string()], to: vec![crate::urls::AS_PUBLIC.to_string()],
cc: vec![local_actor.followers_url.to_string()], cc: vec![local_actor.followers_url.to_string()],
}; };
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, announce).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await .prepare_broadcast(&data, &local_actor, inboxes, announce)
.await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await
} }
pub async fn broadcast_undo_announce_to_followers( pub async fn broadcast_undo_announce_to_followers(
@@ -47,19 +60,30 @@ impl ActivityPubService {
let announce_id = url::Url::parse(&format!( let announce_id = url::Url::parse(&format!(
"{}/activities/announce/{}", "{}/activities/announce/{}",
self.base_url, self.base_url,
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, format!("{}/{}", local_user_id, object_ap_id).as_bytes()), uuid::Uuid::new_v5(
)).map_err(|e| anyhow::anyhow!("{e}"))?; &uuid::Uuid::NAMESPACE_URL,
format!("{}/{}", local_user_id, object_ap_id).as_bytes()
),
))
.map_err(|e| anyhow::anyhow!("{e}"))?;
let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; let Some((local_actor, inboxes)) =
self.accepted_follower_inboxes(&data, local_user_id).await?
else {
return Ok(());
};
let undo = UndoActivity { let undo = UndoActivity {
id: undo_id, id: undo_id,
kind: Default::default(), kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()), actor: ObjectId::from(local_actor.ap_id.clone()),
object: serde_json::json!({"type":"Announce","id":announce_id.to_string(),"actor":local_actor.ap_id.to_string(),"object":object_ap_id.to_string()}), object: serde_json::json!({"type":"Announce","id":announce_id.to_string(),"actor":local_actor.ap_id.to_string(),"object":object_ap_id.to_string()}),
}; };
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, undo).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await .prepare_broadcast(&data, &local_actor, inboxes, undo)
.await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await
} }
pub async fn broadcast_like_to_inbox( pub async fn broadcast_like_to_inbox(
@@ -69,11 +93,16 @@ impl ActivityPubService {
author_inbox_url: url::Url, author_inbox_url: url::Url,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let local_actor = get_local_actor(liker_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(liker_user_id, &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let like_id = url::Url::parse(&format!( let like_id = url::Url::parse(&format!(
"{}/activities/like/{}", "{}/activities/like/{}",
self.base_url, self.base_url,
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, format!("{}/{}", liker_user_id, object_ap_id).as_bytes()), uuid::Uuid::new_v5(
&uuid::Uuid::NAMESPACE_URL,
format!("{}/{}", liker_user_id, object_ap_id).as_bytes()
),
))?; ))?;
let like = crate::activities::LikeActivity { let like = crate::activities::LikeActivity {
id: like_id, id: like_id,
@@ -81,8 +110,11 @@ impl ActivityPubService {
actor: ObjectId::from(local_actor.ap_id.clone()), actor: ObjectId::from(local_actor.ap_id.clone()),
object: object_ap_id, object: object_ap_id,
}; };
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![author_inbox_url], like).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await .prepare_broadcast(&data, &local_actor, vec![author_inbox_url], like)
.await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await
} }
pub async fn broadcast_undo_like_to_inbox( pub async fn broadcast_undo_like_to_inbox(
@@ -92,11 +124,16 @@ impl ActivityPubService {
author_inbox_url: url::Url, author_inbox_url: url::Url,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let local_actor = get_local_actor(liker_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(liker_user_id, &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let like_id = url::Url::parse(&format!( let like_id = url::Url::parse(&format!(
"{}/activities/like/{}", "{}/activities/like/{}",
self.base_url, self.base_url,
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, format!("{}/{}", liker_user_id, object_ap_id).as_bytes()), uuid::Uuid::new_v5(
&uuid::Uuid::NAMESPACE_URL,
format!("{}/{}", liker_user_id, object_ap_id).as_bytes()
),
))?; ))?;
let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
let undo = UndoActivity { let undo = UndoActivity {
@@ -105,8 +142,11 @@ impl ActivityPubService {
actor: ObjectId::from(local_actor.ap_id.clone()), actor: ObjectId::from(local_actor.ap_id.clone()),
object: serde_json::json!({"type":"Like","id":like_id.to_string(),"actor":local_actor.ap_id.to_string(),"object":object_ap_id.to_string()}), object: serde_json::json!({"type":"Like","id":like_id.to_string(),"actor":local_actor.ap_id.to_string(),"object":object_ap_id.to_string()}),
}; };
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![author_inbox_url], undo).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await .prepare_broadcast(&data, &local_actor, vec![author_inbox_url], undo)
.await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await
} }
pub async fn broadcast_delete_to_followers( pub async fn broadcast_delete_to_followers(
@@ -115,7 +155,11 @@ impl ActivityPubService {
ap_id: Url, ap_id: Url,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; let Some((local_actor, inboxes)) =
self.accepted_follower_inboxes(&data, local_user_id).await?
else {
return Ok(());
};
let delete = DeleteActivity { let delete = DeleteActivity {
id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?,
kind: Default::default(), kind: Default::default(),
@@ -124,8 +168,11 @@ impl ActivityPubService {
to: vec![crate::urls::AS_PUBLIC.to_string()], to: vec![crate::urls::AS_PUBLIC.to_string()],
cc: vec![local_actor.followers_url.to_string()], cc: vec![local_actor.followers_url.to_string()],
}; };
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, delete).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await .prepare_broadcast(&data, &local_actor, inboxes, delete)
.await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await
} }
pub async fn broadcast_add_to_followers( pub async fn broadcast_add_to_followers(
@@ -135,7 +182,11 @@ impl ActivityPubService {
object: serde_json::Value, object: serde_json::Value,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; let Some((local_actor, inboxes)) =
self.accepted_follower_inboxes(&data, local_user_id).await?
else {
return Ok(());
};
let add = AddActivity { let add = AddActivity {
id: ap_id, id: ap_id,
kind: Default::default(), kind: Default::default(),
@@ -144,8 +195,11 @@ impl ActivityPubService {
to: vec![crate::urls::AS_PUBLIC.to_string()], to: vec![crate::urls::AS_PUBLIC.to_string()],
cc: vec![local_actor.followers_url.to_string()], cc: vec![local_actor.followers_url.to_string()],
}; };
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, add).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await .prepare_broadcast(&data, &local_actor, inboxes, add)
.await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await
} }
pub async fn broadcast_undo_add_to_followers( pub async fn broadcast_undo_add_to_followers(
@@ -154,15 +208,22 @@ impl ActivityPubService {
watchlist_entry_ap_id: Url, watchlist_entry_ap_id: Url,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; let Some((local_actor, inboxes)) =
self.accepted_follower_inboxes(&data, local_user_id).await?
else {
return Ok(());
};
let undo = UndoActivity { let undo = UndoActivity {
id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?,
kind: Default::default(), kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()), actor: ObjectId::from(local_actor.ap_id.clone()),
object: serde_json::json!({"type":"Add","id":watchlist_entry_ap_id.as_str(),"object":{"id":watchlist_entry_ap_id.as_str()}}), object: serde_json::json!({"type":"Add","id":watchlist_entry_ap_id.as_str(),"object":{"id":watchlist_entry_ap_id.as_str()}}),
}; };
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, undo).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await .prepare_broadcast(&data, &local_actor, inboxes, undo)
.await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await
} }
pub async fn broadcast_create_note( pub async fn broadcast_create_note(
@@ -175,13 +236,18 @@ impl ActivityPubService {
return Ok(()); return Ok(());
} }
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; let Some((local_actor, inboxes)) =
self.accepted_follower_inboxes(&data, local_user_id).await?
else {
return Ok(());
};
let note_id_str = note["id"].as_str().unwrap_or(""); let note_id_str = note["id"].as_str().unwrap_or("");
let create_id = Url::parse(&format!( let create_id = Url::parse(&format!(
"{}/activities/create/{}", "{}/activities/create/{}",
self.base_url, self.base_url,
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, note_id_str.as_bytes()) uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, note_id_str.as_bytes())
)).map_err(|e| anyhow::anyhow!("{e}"))?; ))
.map_err(|e| anyhow::anyhow!("{e}"))?;
let (to, cc) = visibility_addressing(visibility, &local_actor.followers_url); let (to, cc) = visibility_addressing(visibility, &local_actor.followers_url);
let create = CreateActivity { let create = CreateActivity {
id: create_id, id: create_id,
@@ -193,8 +259,11 @@ impl ActivityPubService {
bto: vec![], bto: vec![],
bcc: vec![], bcc: vec![],
}; };
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, create).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await .prepare_broadcast(&data, &local_actor, inboxes, create)
.await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await
} }
pub async fn broadcast_update_note( pub async fn broadcast_update_note(
@@ -207,7 +276,11 @@ impl ActivityPubService {
return Ok(()); return Ok(());
} }
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; let Some((local_actor, inboxes)) =
self.accepted_follower_inboxes(&data, local_user_id).await?
else {
return Ok(());
};
let (to, cc) = visibility_addressing(visibility, &local_actor.followers_url); let (to, cc) = visibility_addressing(visibility, &local_actor.followers_url);
let update = crate::activities::UpdateActivity { let update = crate::activities::UpdateActivity {
id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?,
@@ -217,16 +290,30 @@ impl ActivityPubService {
to, to,
cc, cc,
}; };
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, update).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await .prepare_broadcast(&data, &local_actor, inboxes, update)
.await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await
} }
pub async fn broadcast_actor_update(&self, user_id: uuid::Uuid) -> anyhow::Result<()> { pub async fn broadcast_actor_update(&self, user_id: uuid::Uuid) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let local_actor = get_local_actor(user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(user_id, &data)
let person = local_actor.clone().into_json(&data).await.map_err(|e| anyhow::anyhow!("{e}"))?; .await
let person_json = serde_json::to_value(WithContext::new(person, crate::urls::actor_ap_context()))?; .map_err(|e| anyhow::anyhow!("{e}"))?;
let update_id = Url::parse(&format!("{}/activities/update/{}", self.base_url, uuid::Uuid::new_v4()))?; let person = local_actor
.clone()
.into_json(&data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let person_json =
serde_json::to_value(WithContext::new(person, crate::urls::actor_ap_context()))?;
let update_id = Url::parse(&format!(
"{}/activities/update/{}",
self.base_url,
uuid::Uuid::new_v4()
))?;
let update = UpdateActivity { let update = UpdateActivity {
id: update_id, id: update_id,
kind: Default::default(), kind: Default::default(),
@@ -240,8 +327,11 @@ impl ActivityPubService {
return Ok(()); return Ok(());
}; };
tracing::info!(%user_id, inbox_count = inboxes.len(), "broadcasting actor update"); tracing::info!(%user_id, inbox_count = inboxes.len(), "broadcasting actor update");
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, update).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await .prepare_broadcast(&data, &local_actor, inboxes, update)
.await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await
} }
pub async fn broadcast_move( pub async fn broadcast_move(
@@ -250,7 +340,9 @@ impl ActivityPubService {
new_actor_url: url::Url, new_actor_url: url::Url,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let local_actor = get_local_actor(user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(user_id, &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let Some((_, inboxes)) = self.accepted_follower_inboxes(&data, user_id).await? else { let Some((_, inboxes)) = self.accepted_follower_inboxes(&data, user_id).await? else {
tracing::info!(%user_id, "broadcast_move: no accepted followers"); tracing::info!(%user_id, "broadcast_move: no accepted followers");
return Ok(()); return Ok(());
@@ -262,8 +354,11 @@ impl ActivityPubService {
object: local_actor.ap_id.clone(), object: local_actor.ap_id.clone(),
target: new_actor_url.clone(), target: new_actor_url.clone(),
}; };
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, move_activity).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; .prepare_broadcast(&data, &local_actor, inboxes, move_activity)
.await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await?;
tracing::info!(%user_id, target = %new_actor_url, "broadcast_move: dispatched"); tracing::info!(%user_id, target = %new_actor_url, "broadcast_move: dispatched");
Ok(()) Ok(())
} }
@@ -280,10 +375,7 @@ pub(super) fn visibility_addressing(
vec![crate::urls::AS_PUBLIC.to_string()], vec![crate::urls::AS_PUBLIC.to_string()],
vec![followers_url.to_string()], vec![followers_url.to_string()],
), ),
ApVisibility::FollowersOnly => ( ApVisibility::FollowersOnly => (vec![followers_url.to_string()], vec![]),
vec![followers_url.to_string()],
vec![],
),
ApVisibility::Private => (vec![], vec![]), ApVisibility::Private => (vec![], vec![]),
} }
} }

View File

@@ -57,13 +57,23 @@ impl Activity for RawActivity {
type DataType = FederationData; type DataType = FederationData;
type Error = Error; type Error = Error;
fn id(&self) -> &Url { &self.id } fn id(&self) -> &Url {
fn actor(&self) -> &Url { &self.actor_url } &self.id
}
fn actor(&self) -> &Url {
&self.actor_url
}
async fn verify(&self, _data: &activitypub_federation::config::Data<Self::DataType>) -> Result<(), Self::Error> { async fn verify(
&self,
_data: &activitypub_federation::config::Data<Self::DataType>,
) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
async fn receive(self, _data: &activitypub_federation::config::Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(
self,
_data: &activitypub_federation::config::Data<Self::DataType>,
) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
} }
@@ -96,8 +106,7 @@ impl ActivityPubService {
let max_attempts = self.delivery_max_attempts; let max_attempts = self.delivery_max_attempts;
let initial_delay = self.delivery_initial_delay_secs; let initial_delay = self.delivery_initial_delay_secs;
tokio::spawn(async move { tokio::spawn(async move {
let failures = let failures = send_with_retry(sends, &data, max_attempts, initial_delay).await;
send_with_retry(sends, &data, max_attempts, initial_delay).await;
if !failures.is_empty() { if !failures.is_empty() {
tracing::warn!(count = failures.len(), "some deliveries failed permanently"); tracing::warn!(count = failures.len(), "some deliveries failed permanently");
} }
@@ -128,9 +137,12 @@ impl ActivityPubService {
.and_then(|v| v.as_str()) .and_then(|v| v.as_str())
.and_then(|s| Url::parse(s).ok()) .and_then(|s| Url::parse(s).ok())
.unwrap_or_else(|| actor.ap_id.clone()); .unwrap_or_else(|| actor.ap_id.clone());
let raw = RawActivity { id, actor_url, value: activity.clone() }; let raw = RawActivity {
let sends = id,
SendActivityTask::prepare(&raw, &actor, vec![inbox.clone()], &data).await?; actor_url,
value: activity.clone(),
};
let sends = SendActivityTask::prepare(&raw, &actor, vec![inbox.clone()], &data).await?;
let failures = send_with_retry( let failures = send_with_retry(
sends, sends,
&data, &data,
@@ -172,7 +184,8 @@ impl ActivityPubService {
where where
A: Activity + Serialize + Debug + Send + Sync, A: Activity + Serialize + Debug + Send + Sync,
{ {
let with_ctx = activitypub_federation::protocol::context::WithContext::new_default(activity); let with_ctx =
activitypub_federation::protocol::context::WithContext::new_default(activity);
let activity_json = serde_json::to_value(&with_ctx)?; let activity_json = serde_json::to_value(&with_ctx)?;
let sends = let sends =
SendActivityTask::prepare(&with_ctx, local_actor, inboxes.clone(), data).await?; SendActivityTask::prepare(&with_ctx, local_actor, inboxes.clone(), data).await?;

View File

@@ -20,112 +20,228 @@ impl ActivityPubService {
return self.follow_local(local_user_id, parts[0], &data).await; return self.follow_local(local_user_id, parts[0], &data).await;
} }
let remote_actor = self.webfinger_https(handle, &data).await?; let remote_actor = self.webfinger_https(handle, &data).await?;
let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(local_user_id, &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
let follow_id_str = follow_id.to_string(); let follow_id_str = follow_id.to_string();
let remote = RemoteActor { let remote = RemoteActor {
url: remote_actor.ap_id.to_string(), url: remote_actor.ap_id.to_string(),
handle: format!("{}@{}", remote_actor.username, remote_actor.ap_id.host_str().unwrap_or("")), handle: format!(
"{}@{}",
remote_actor.username,
remote_actor.ap_id.host_str().unwrap_or("")
),
inbox_url: remote_actor.inbox_url.to_string(), inbox_url: remote_actor.inbox_url.to_string(),
shared_inbox_url: remote_actor.shared_inbox_url.as_ref().map(|u| u.to_string()), shared_inbox_url: remote_actor
.shared_inbox_url
.as_ref()
.map(|u| u.to_string()),
display_name: Some(remote_actor.username.clone()), display_name: Some(remote_actor.username.clone()),
avatar_url: remote_actor.avatar_url.as_ref().map(|u| u.to_string()), avatar_url: remote_actor.avatar_url.as_ref().map(|u| u.to_string()),
outbox_url: Some(remote_actor.outbox_url.to_string()), outbox_url: Some(remote_actor.outbox_url.to_string()),
}; };
// Save BEFORE delivering — prevents lost state on process restart. // Save BEFORE delivering — prevents lost state on process restart.
data.follow_repo.add_following(local_user_id, remote, &follow_id_str).await?; data.follow_repo
.add_following(local_user_id, remote, &follow_id_str)
.await?;
let follow = FollowActivity { let follow = FollowActivity {
id: Url::parse(&follow_id_str)?, id: Url::parse(&follow_id_str)?,
kind: Default::default(), kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()), actor: ObjectId::from(local_actor.ap_id.clone()),
object: ObjectId::from(remote_actor.ap_id.clone()), object: ObjectId::from(remote_actor.ap_id.clone()),
}; };
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![remote_actor.inbox()], follow).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await .prepare_broadcast(&data, &local_actor, vec![remote_actor.inbox()], follow)
.await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await
} }
pub async fn unfollow(&self, local_user_id: uuid::Uuid, actor_url_str: &str) -> anyhow::Result<()> { pub async fn unfollow(
&self,
local_user_id: uuid::Uuid,
actor_url_str: &str,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
if actor_url_str.starts_with(&self.base_url) { if actor_url_str.starts_with(&self.base_url) {
return self.unfollow_local(local_user_id, actor_url_str, &data).await; return self
.unfollow_local(local_user_id, actor_url_str, &data)
.await;
} }
let remote = data.actor_repo.get_remote_actor(actor_url_str).await? let remote = data
.actor_repo
.get_remote_actor(actor_url_str)
.await?
.ok_or_else(|| anyhow::anyhow!("remote actor not found: {}", actor_url_str))?; .ok_or_else(|| anyhow::anyhow!("remote actor not found: {}", actor_url_str))?;
let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(local_user_id, &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let remote_ap_id = Url::parse(actor_url_str)?; let remote_ap_id = Url::parse(actor_url_str)?;
let inbox = Url::parse(&remote.inbox_url)?; let inbox = Url::parse(&remote.inbox_url)?;
let follow_id = data.follow_repo.get_follow_activity_id(local_user_id, actor_url_str).await? let follow_id = data
.follow_repo
.get_follow_activity_id(local_user_id, actor_url_str)
.await?
.and_then(|id| Url::parse(&id).ok()) .and_then(|id| Url::parse(&id).ok())
.unwrap_or_else(|| activity_url(&self.base_url).unwrap_or_else(|_| remote_ap_id.clone())); .unwrap_or_else(|| {
let follow = FollowActivity { id: follow_id, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: ObjectId::from(remote_ap_id) }; activity_url(&self.base_url).unwrap_or_else(|_| remote_ap_id.clone())
});
let follow = FollowActivity {
id: follow_id,
kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()),
object: ObjectId::from(remote_ap_id),
};
let undo = UndoActivity { let undo = UndoActivity {
id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?,
kind: Default::default(), kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()), actor: ObjectId::from(local_actor.ap_id.clone()),
object: serde_json::to_value(&follow).map_err(|e| anyhow::anyhow!("{e}"))?, object: serde_json::to_value(&follow).map_err(|e| anyhow::anyhow!("{e}"))?,
}; };
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], undo).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; .prepare_broadcast(&data, &local_actor, vec![inbox], undo)
data.follow_repo.remove_following(local_user_id, actor_url_str).await?; .await?;
data.object_handler.on_actor_removed(&Url::parse(actor_url_str)?).await?; self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await?;
data.follow_repo
.remove_following(local_user_id, actor_url_str)
.await?;
data.object_handler
.on_actor_removed(&Url::parse(actor_url_str)?)
.await?;
Ok(()) Ok(())
} }
pub async fn accept_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> anyhow::Result<()> { pub async fn accept_follower(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(local_user_id, &data)
let remote_actor = data.actor_repo.get_remote_actor(remote_actor_url).await? .await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let remote_actor = data
.actor_repo
.get_remote_actor(remote_actor_url)
.await?
.ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?;
let follow_id_str = data.follow_repo.get_follower_follow_activity_id(local_user_id, remote_actor_url).await? let follow_id_str = data
.ok_or_else(|| anyhow::anyhow!("follow activity id not found for {}", remote_actor_url))?; .follow_repo
let follow = FollowActivity { id: Url::parse(&follow_id_str)?, kind: Default::default(), actor: ObjectId::from(Url::parse(remote_actor_url)?), object: ObjectId::from(local_actor.ap_id.clone()) }; .get_follower_follow_activity_id(local_user_id, remote_actor_url)
let accept = AcceptActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: follow }; .await?
data.follow_repo.update_follower_status(local_user_id, remote_actor_url, FollowerStatus::Accepted).await?; .ok_or_else(|| {
anyhow::anyhow!("follow activity id not found for {}", remote_actor_url)
})?;
let follow = FollowActivity {
id: Url::parse(&follow_id_str)?,
kind: Default::default(),
actor: ObjectId::from(Url::parse(remote_actor_url)?),
object: ObjectId::from(local_actor.ap_id.clone()),
};
let accept = AcceptActivity {
id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?,
kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()),
object: follow,
};
data.follow_repo
.update_follower_status(local_user_id, remote_actor_url, FollowerStatus::Accepted)
.await?;
let inbox = Url::parse(&remote_actor.inbox_url)?; let inbox = Url::parse(&remote_actor.inbox_url)?;
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], accept).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; .prepare_broadcast(&data, &local_actor, vec![inbox], accept)
let target_inbox = remote_actor.shared_inbox_url.clone().unwrap_or_else(|| remote_actor.inbox_url.clone()); .await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await?;
let target_inbox = remote_actor
.shared_inbox_url
.clone()
.unwrap_or_else(|| remote_actor.inbox_url.clone());
self.spawn_backfill(local_user_id, target_inbox); self.spawn_backfill(local_user_id, target_inbox);
Ok(()) Ok(())
} }
pub async fn reject_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> anyhow::Result<()> { pub async fn reject_follower(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(local_user_id, &data)
let remote_actor = data.actor_repo.get_remote_actor(remote_actor_url).await? .await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let remote_actor = data
.actor_repo
.get_remote_actor(remote_actor_url)
.await?
.ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?;
let follow = FollowActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(Url::parse(remote_actor_url)?), object: ObjectId::from(local_actor.ap_id.clone()) }; let follow = FollowActivity {
let reject = RejectActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: follow }; id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?,
kind: Default::default(),
actor: ObjectId::from(Url::parse(remote_actor_url)?),
object: ObjectId::from(local_actor.ap_id.clone()),
};
let reject = RejectActivity {
id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?,
kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()),
object: follow,
};
let inbox = Url::parse(&remote_actor.inbox_url)?; let inbox = Url::parse(&remote_actor.inbox_url)?;
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], reject).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; .prepare_broadcast(&data, &local_actor, vec![inbox], reject)
data.follow_repo.remove_follower(local_user_id, remote_actor_url).await?; .await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await?;
data.follow_repo
.remove_follower(local_user_id, remote_actor_url)
.await?;
Ok(()) Ok(())
} }
pub async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> { pub async fn get_pending_followers(
&self,
local_user_id: uuid::Uuid,
) -> anyhow::Result<Vec<RemoteActor>> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.follow_repo.get_pending_followers(local_user_id).await data.follow_repo.get_pending_followers(local_user_id).await
} }
pub async fn get_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> { pub async fn get_accepted_followers(
&self,
local_user_id: uuid::Uuid,
) -> anyhow::Result<Vec<RemoteActor>> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
Ok(data.follow_repo.get_followers(local_user_id).await? Ok(data
.follow_repo
.get_followers(local_user_id)
.await?
.into_iter() .into_iter()
.filter(|f| f.status == FollowerStatus::Accepted) .filter(|f| f.status == FollowerStatus::Accepted)
.map(|f| f.actor) .map(|f| f.actor)
.collect()) .collect())
} }
pub async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result<usize> { pub async fn count_accepted_followers(
&self,
local_user_id: uuid::Uuid,
) -> anyhow::Result<usize> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
Ok(data.follow_repo.get_followers(local_user_id).await? Ok(data
.follow_repo
.get_followers(local_user_id)
.await?
.into_iter() .into_iter()
.filter(|f| f.status == FollowerStatus::Accepted) .filter(|f| f.status == FollowerStatus::Accepted)
.count()) .count())
} }
pub async fn get_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> { pub async fn get_following(
&self,
local_user_id: uuid::Uuid,
) -> anyhow::Result<Vec<RemoteActor>> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.follow_repo.get_following(local_user_id).await data.follow_repo.get_following(local_user_id).await
} }
@@ -135,17 +251,37 @@ impl ActivityPubService {
data.follow_repo.count_following(local_user_id).await data.follow_repo.count_following(local_user_id).await
} }
pub async fn remove_follower(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { pub async fn remove_follower(
&self,
local_user_id: uuid::Uuid,
actor_url: &str,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.follow_repo.remove_follower(local_user_id, actor_url).await data.follow_repo
.remove_follower(local_user_id, actor_url)
.await
} }
pub async fn block_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { pub async fn block_actor(
&self,
local_user_id: uuid::Uuid,
actor_url: &str,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.blocklist_repo.add_blocked_actor(local_user_id, actor_url).await?; data.blocklist_repo
let _ = data.follow_repo.remove_follower(local_user_id, actor_url).await; .add_blocked_actor(local_user_id, actor_url)
let _ = data.follow_repo.remove_following(local_user_id, actor_url).await; .await?;
let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let _ = data
.follow_repo
.remove_follower(local_user_id, actor_url)
.await;
let _ = data
.follow_repo
.remove_following(local_user_id, actor_url)
.await;
let local_actor = get_local_actor(local_user_id, &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
if let Ok(Some(remote_actor)) = data.actor_repo.get_remote_actor(actor_url).await { if let Ok(Some(remote_actor)) = data.actor_repo.get_remote_actor(actor_url).await {
let block = crate::activities::BlockActivity { let block = crate::activities::BlockActivity {
id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?,
@@ -154,25 +290,48 @@ impl ActivityPubService {
object: Url::parse(actor_url)?, object: Url::parse(actor_url)?,
}; };
let inbox = Url::parse(&remote_actor.inbox_url)?; let inbox = Url::parse(&remote_actor.inbox_url)?;
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], block).await?; let (json, sends, inboxes) = self
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; .prepare_broadcast(&data, &local_actor, vec![inbox], block)
.await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json)
.await?;
} }
Ok(()) Ok(())
} }
pub async fn unblock_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { pub async fn unblock_actor(
&self,
local_user_id: uuid::Uuid,
actor_url: &str,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.blocklist_repo.remove_blocked_actor(local_user_id, actor_url).await data.blocklist_repo
.remove_blocked_actor(local_user_id, actor_url)
.await
} }
pub async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> { pub async fn get_blocked_actors(
&self,
local_user_id: uuid::Uuid,
) -> anyhow::Result<Vec<RemoteActor>> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let actor_urls = data.blocklist_repo.get_blocked_actors(local_user_id).await?; let actor_urls = data
.blocklist_repo
.get_blocked_actors(local_user_id)
.await?;
let mut actors = Vec::new(); let mut actors = Vec::new();
for url in actor_urls { for url in actor_urls {
let actor = match data.actor_repo.get_remote_actor(&url).await { let actor = match data.actor_repo.get_remote_actor(&url).await {
Ok(Some(a)) => a, Ok(Some(a)) => a,
_ => RemoteActor { url: url.clone(), handle: url.clone(), inbox_url: url.clone(), shared_inbox_url: None, display_name: None, avatar_url: None, outbox_url: None }, _ => RemoteActor {
url: url.clone(),
handle: url.clone(),
inbox_url: url.clone(),
shared_inbox_url: None,
display_name: None,
avatar_url: None,
outbox_url: None,
},
}; };
actors.push(actor); actors.push(actor);
} }
@@ -185,15 +344,27 @@ impl ActivityPubService {
target_username: &str, target_username: &str,
data: &activitypub_federation::config::Data<FederationData>, data: &activitypub_federation::config::Data<FederationData>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let target = data.user_repo.find_by_username(target_username).await? let target = data
.user_repo
.find_by_username(target_username)
.await?
.ok_or_else(|| anyhow::anyhow!("user not found: {}", target_username))?; .ok_or_else(|| anyhow::anyhow!("user not found: {}", target_username))?;
if target.id == local_user_id { if target.id == local_user_id {
return Err(anyhow::anyhow!("cannot follow yourself")); return Err(anyhow::anyhow!("cannot follow yourself"));
} }
let follower_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); let follower_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string();
let target_actor_url = crate::urls::actor_url(&self.base_url, target.id); let target_actor_url = crate::urls::actor_url(&self.base_url, target.id);
let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?.to_string(); let follow_id = activity_url(&self.base_url)
data.follow_repo.add_follower(target.id, &follower_actor_url, FollowerStatus::Accepted, &follow_id).await?; .map_err(|e| anyhow::anyhow!("{e}"))?
.to_string();
data.follow_repo
.add_follower(
target.id,
&follower_actor_url,
FollowerStatus::Accepted,
&follow_id,
)
.await?;
let target_as_remote = RemoteActor { let target_as_remote = RemoteActor {
url: target_actor_url.to_string(), url: target_actor_url.to_string(),
handle: format!("{}@{}", target.username, data.domain), handle: format!("{}@{}", target.username, data.domain),
@@ -203,8 +374,16 @@ impl ActivityPubService {
avatar_url: None, avatar_url: None,
outbox_url: None, outbox_url: None,
}; };
data.follow_repo.add_following(local_user_id, target_as_remote, &follow_id).await?; data.follow_repo
data.follow_repo.update_following_status(local_user_id, target_actor_url.as_ref(), FollowingStatus::Accepted).await?; .add_following(local_user_id, target_as_remote, &follow_id)
.await?;
data.follow_repo
.update_following_status(
local_user_id,
target_actor_url.as_ref(),
FollowingStatus::Accepted,
)
.await?;
tracing::info!(follower = %local_user_id, followee = %target.id, "local follow"); tracing::info!(follower = %local_user_id, followee = %target.id, "local follow");
Ok(()) Ok(())
} }
@@ -219,8 +398,12 @@ impl ActivityPubService {
let target_user_id = crate::urls::extract_user_id_from_url(&target_url) let target_user_id = crate::urls::extract_user_id_from_url(&target_url)
.ok_or_else(|| anyhow::anyhow!("invalid local actor URL: {}", target_actor_url))?; .ok_or_else(|| anyhow::anyhow!("invalid local actor URL: {}", target_actor_url))?;
let local_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); let local_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string();
data.follow_repo.remove_follower(target_user_id, &local_actor_url).await?; data.follow_repo
data.follow_repo.remove_following(local_user_id, target_actor_url).await?; .remove_follower(target_user_id, &local_actor_url)
.await?;
data.follow_repo
.remove_following(local_user_id, target_actor_url)
.await?;
tracing::info!(follower = %local_user_id, followee = %target_user_id, "local unfollow"); tracing::info!(follower = %local_user_id, followee = %target_user_id, "local unfollow");
Ok(()) Ok(())
} }

View File

@@ -1,9 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use activitypub_federation::{ use activitypub_federation::{protocol::context::WithContext, traits::Object};
protocol::context::WithContext,
traits::Object,
};
use axum::{Router, extract::DefaultBodyLimit, routing::get, routing::post}; use axum::{Router, extract::DefaultBodyLimit, routing::get, routing::post};
use url::Url; use url::Url;
@@ -18,10 +15,8 @@ use crate::{
nodeinfo::{nodeinfo_handler, nodeinfo_well_known_handler}, nodeinfo::{nodeinfo_handler, nodeinfo_well_known_handler},
outbox::outbox_handler, outbox::outbox_handler,
repository::{ repository::{
ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, FollowRepository,
FollowRepository, FollowerStatus, FollowingStatus, RemoteActor,
}, },
urls::activity_url,
user::ApUserRepository, user::ApUserRepository,
webfinger::webfinger_handler, webfinger::webfinger_handler,
}; };
@@ -67,54 +62,91 @@ pub struct ActivityPubServiceBuilder {
impl ActivityPubServiceBuilder { impl ActivityPubServiceBuilder {
pub fn activity_repo(mut self, v: Arc<dyn ActivityRepository>) -> Self { pub fn activity_repo(mut self, v: Arc<dyn ActivityRepository>) -> Self {
self.activity_repo = Some(v); self self.activity_repo = Some(v);
self
} }
pub fn follow_repo(mut self, v: Arc<dyn FollowRepository>) -> Self { pub fn follow_repo(mut self, v: Arc<dyn FollowRepository>) -> Self {
self.follow_repo = Some(v); self self.follow_repo = Some(v);
self
} }
pub fn actor_repo(mut self, v: Arc<dyn ActorRepository>) -> Self { pub fn actor_repo(mut self, v: Arc<dyn ActorRepository>) -> Self {
self.actor_repo = Some(v); self self.actor_repo = Some(v);
self
} }
pub fn blocklist_repo(mut self, v: Arc<dyn BlocklistRepository>) -> Self { pub fn blocklist_repo(mut self, v: Arc<dyn BlocklistRepository>) -> Self {
self.blocklist_repo = Some(v); self self.blocklist_repo = Some(v);
self
} }
pub fn user_repo(mut self, v: Arc<dyn ApUserRepository>) -> Self { pub fn user_repo(mut self, v: Arc<dyn ApUserRepository>) -> Self {
self.user_repo = Some(v); self self.user_repo = Some(v);
self
} }
pub fn content_reader(mut self, v: Arc<dyn ApContentReader>) -> Self { pub fn content_reader(mut self, v: Arc<dyn ApContentReader>) -> Self {
self.content_reader = Some(v); self self.content_reader = Some(v);
self
} }
pub fn object_handler(mut self, v: Arc<dyn ApObjectHandler>) -> Self { pub fn object_handler(mut self, v: Arc<dyn ApObjectHandler>) -> Self {
self.object_handler = Some(v); self self.object_handler = Some(v);
self
}
pub fn allow_registration(mut self, v: bool) -> Self {
self.allow_registration = v;
self
}
pub fn software_name(mut self, v: impl Into<String>) -> Self {
self.software_name = v.into();
self
}
pub fn debug(mut self, v: bool) -> Self {
self.debug = v;
self
} }
pub fn allow_registration(mut self, v: bool) -> Self { self.allow_registration = v; self }
pub fn software_name(mut self, v: impl Into<String>) -> Self { self.software_name = v.into(); self }
pub fn debug(mut self, v: bool) -> Self { self.debug = v; self }
pub fn event_publisher(mut self, v: Arc<dyn crate::data::EventPublisher>) -> Self { pub fn event_publisher(mut self, v: Arc<dyn crate::data::EventPublisher>) -> Self {
self.event_publisher = Some(v); self self.event_publisher = Some(v);
self
}
pub fn delivery_max_attempts(mut self, v: u32) -> Self {
self.delivery_max_attempts = v;
self
}
pub fn delivery_initial_delay_secs(mut self, v: u64) -> Self {
self.delivery_initial_delay_secs = v;
self
} }
pub fn delivery_max_attempts(mut self, v: u32) -> Self { self.delivery_max_attempts = v; self }
pub fn delivery_initial_delay_secs(mut self, v: u64) -> Self { self.delivery_initial_delay_secs = v; self }
pub async fn build(self) -> anyhow::Result<ActivityPubService> { pub async fn build(self) -> anyhow::Result<ActivityPubService> {
let activity_repo = self.activity_repo let activity_repo = self
.activity_repo
.ok_or_else(|| anyhow::anyhow!("activity_repo required — call .activity_repo(arc)"))?; .ok_or_else(|| anyhow::anyhow!("activity_repo required — call .activity_repo(arc)"))?;
let follow_repo = self.follow_repo let follow_repo = self
.follow_repo
.ok_or_else(|| anyhow::anyhow!("follow_repo required — call .follow_repo(arc)"))?; .ok_or_else(|| anyhow::anyhow!("follow_repo required — call .follow_repo(arc)"))?;
let actor_repo = self.actor_repo let actor_repo = self
.actor_repo
.ok_or_else(|| anyhow::anyhow!("actor_repo required — call .actor_repo(arc)"))?; .ok_or_else(|| anyhow::anyhow!("actor_repo required — call .actor_repo(arc)"))?;
let blocklist_repo = self.blocklist_repo let blocklist_repo = self.blocklist_repo.ok_or_else(|| {
.ok_or_else(|| anyhow::anyhow!("blocklist_repo required — call .blocklist_repo(arc)"))?; anyhow::anyhow!("blocklist_repo required — call .blocklist_repo(arc)")
let user_repo = self.user_repo })?;
let user_repo = self
.user_repo
.ok_or_else(|| anyhow::anyhow!("user_repo required — call .user_repo(arc)"))?; .ok_or_else(|| anyhow::anyhow!("user_repo required — call .user_repo(arc)"))?;
let content_reader = self.content_reader let content_reader = self.content_reader.ok_or_else(|| {
.ok_or_else(|| anyhow::anyhow!("content_reader required — call .content_reader(arc)"))?; anyhow::anyhow!("content_reader required — call .content_reader(arc)")
let object_handler = self.object_handler })?;
.ok_or_else(|| anyhow::anyhow!("object_handler required — call .object_handler(arc)"))?; let object_handler = self.object_handler.ok_or_else(|| {
anyhow::anyhow!("object_handler required — call .object_handler(arc)")
})?;
let data = FederationData::new( let data = FederationData::new(
activity_repo, follow_repo, actor_repo, blocklist_repo, activity_repo,
user_repo, content_reader, object_handler, follow_repo,
self.base_url.clone(), self.allow_registration, self.software_name, actor_repo,
blocklist_repo,
user_repo,
content_reader,
object_handler,
self.base_url.clone(),
self.allow_registration,
self.software_name,
self.event_publisher, self.event_publisher,
); );
let federation_config = ApFederationConfig::new(data, self.debug).await?; let federation_config = ApFederationConfig::new(data, self.debug).await?;
@@ -147,9 +179,15 @@ impl ActivityPubService {
} }
} }
pub fn federation_config(&self) -> &ApFederationConfig { &self.federation_config } pub fn federation_config(&self) -> &ApFederationConfig {
pub fn request_data(&self) -> activitypub_federation::config::Data<FederationData> { self.federation_config.to_request_data() } &self.federation_config
pub fn base_url(&self) -> &str { &self.base_url } }
pub fn request_data(&self) -> activitypub_federation::config::Data<FederationData> {
self.federation_config.to_request_data()
}
pub fn base_url(&self) -> &str {
&self.base_url
}
/// Returns the ActivityPub router. Inbox routes enforce a 1 MB body limit. /// Returns the ActivityPub router. Inbox routes enforce a 1 MB body limit.
pub fn router<S>(&self) -> Router<S> pub fn router<S>(&self) -> Router<S>
@@ -160,9 +198,15 @@ impl ActivityPubService {
.route("/.well-known/nodeinfo", get(nodeinfo_well_known_handler)) .route("/.well-known/nodeinfo", get(nodeinfo_well_known_handler))
.route("/nodeinfo/2.0", get(nodeinfo_handler)) .route("/nodeinfo/2.0", get(nodeinfo_handler))
.route("/.well-known/webfinger", get(webfinger_handler)) .route("/.well-known/webfinger", get(webfinger_handler))
.route("/inbox", post(inbox_handler).layer(DefaultBodyLimit::max(1024 * 1024))) .route(
"/inbox",
post(inbox_handler).layer(DefaultBodyLimit::max(1024 * 1024)),
)
.route("/users/{id}", get(actor_handler)) .route("/users/{id}", get(actor_handler))
.route("/users/{id}/inbox", post(inbox_handler).layer(DefaultBodyLimit::max(1024 * 1024))) .route(
"/users/{id}/inbox",
post(inbox_handler).layer(DefaultBodyLimit::max(1024 * 1024)),
)
.route("/users/{id}/outbox", get(outbox_handler)) .route("/users/{id}/outbox", get(outbox_handler))
.route("/users/{id}/followers", get(followers_handler)) .route("/users/{id}/followers", get(followers_handler))
.route("/users/{id}/following", get(following_handler)) .route("/users/{id}/following", get(following_handler))
@@ -172,12 +216,24 @@ impl ActivityPubService {
pub async fn actor_json(&self, user_id_str: &str) -> anyhow::Result<String> { pub async fn actor_json(&self, user_id_str: &str) -> anyhow::Result<String> {
let uuid = uuid::Uuid::parse_str(user_id_str)?; let uuid = uuid::Uuid::parse_str(user_id_str)?;
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let actor = get_local_actor(uuid, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let actor = get_local_actor(uuid, &data)
let person = actor.into_json(&data).await.map_err(|e| anyhow::anyhow!("{e}"))?; .await
Ok(serde_json::to_string(&WithContext::new(person, crate::urls::actor_ap_context()))?) .map_err(|e| anyhow::anyhow!("{e}"))?;
let person = actor
.into_json(&data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
Ok(serde_json::to_string(&WithContext::new(
person,
crate::urls::actor_ap_context(),
))?)
} }
pub async fn followers_collection_json(&self, user_id: uuid::Uuid, page: Option<u32>) -> anyhow::Result<String> { pub async fn followers_collection_json(
&self,
user_id: uuid::Uuid,
page: Option<u32>,
) -> anyhow::Result<String> {
const AP_CONTEXT: &str = "https://www.w3.org/ns/activitystreams"; const AP_CONTEXT: &str = "https://www.w3.org/ns/activitystreams";
const PAGE_SIZE: usize = 20; const PAGE_SIZE: usize = 20;
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
@@ -186,11 +242,16 @@ impl ActivityPubService {
let obj = if let Some(p) = page { let obj = if let Some(p) = page {
let p = p.max(1); let p = p.max(1);
let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE; let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE;
let followers = data.follow_repo.get_followers_page(user_id, offset as u32, PAGE_SIZE).await?; let followers = data
.follow_repo
.get_followers_page(user_id, offset as u32, PAGE_SIZE)
.await?;
let has_next = offset + followers.len() < total; let has_next = offset + followers.len() < total;
let items: Vec<String> = followers.into_iter().map(|f| f.actor.url).collect(); let items: Vec<String> = followers.into_iter().map(|f| f.actor.url).collect();
let mut obj = serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollectionPage","id":format!("{}?page={}",collection_id,p),"partOf":collection_id,"totalItems":total,"orderedItems":items}); let mut obj = serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollectionPage","id":format!("{}?page={}",collection_id,p),"partOf":collection_id,"totalItems":total,"orderedItems":items});
if has_next { obj["next"] = serde_json::json!(format!("{}?page={}",collection_id,p+1)); } if has_next {
obj["next"] = serde_json::json!(format!("{}?page={}", collection_id, p + 1));
}
obj obj
} else { } else {
serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollection","id":collection_id,"totalItems":total,"first":format!("{}?page=1",collection_id)}) serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollection","id":collection_id,"totalItems":total,"first":format!("{}?page=1",collection_id)})
@@ -198,7 +259,11 @@ impl ActivityPubService {
Ok(serde_json::to_string(&obj)?) Ok(serde_json::to_string(&obj)?)
} }
pub async fn following_collection_json(&self, user_id: uuid::Uuid, page: Option<u32>) -> anyhow::Result<String> { pub async fn following_collection_json(
&self,
user_id: uuid::Uuid,
page: Option<u32>,
) -> anyhow::Result<String> {
const AP_CONTEXT: &str = "https://www.w3.org/ns/activitystreams"; const AP_CONTEXT: &str = "https://www.w3.org/ns/activitystreams";
const PAGE_SIZE: usize = 20; const PAGE_SIZE: usize = 20;
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
@@ -207,11 +272,16 @@ impl ActivityPubService {
let obj = if let Some(p) = page { let obj = if let Some(p) = page {
let p = p.max(1); let p = p.max(1);
let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE; let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE;
let following = data.follow_repo.get_following_page(user_id, offset as u32, PAGE_SIZE).await?; let following = data
.follow_repo
.get_following_page(user_id, offset as u32, PAGE_SIZE)
.await?;
let has_next = offset + following.len() < total; let has_next = offset + following.len() < total;
let items: Vec<String> = following.into_iter().map(|a| a.url).collect(); let items: Vec<String> = following.into_iter().map(|a| a.url).collect();
let mut obj = serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollectionPage","id":format!("{}?page={}",collection_id,p),"partOf":collection_id,"totalItems":total,"orderedItems":items}); let mut obj = serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollectionPage","id":format!("{}?page={}",collection_id,p),"partOf":collection_id,"totalItems":total,"orderedItems":items});
if has_next { obj["next"] = serde_json::json!(format!("{}?page={}",collection_id,p+1)); } if has_next {
obj["next"] = serde_json::json!(format!("{}?page={}", collection_id, p + 1));
}
obj obj
} else { } else {
serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollection","id":collection_id,"totalItems":total,"first":format!("{}?page=1",collection_id)}) serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollection","id":collection_id,"totalItems":total,"first":format!("{}?page=1",collection_id)})
@@ -219,35 +289,67 @@ impl ActivityPubService {
Ok(serde_json::to_string(&obj)?) Ok(serde_json::to_string(&obj)?)
} }
pub async fn mark_follower_accepted(&self, user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { pub async fn mark_follower_accepted(
&self,
user_id: uuid::Uuid,
actor_url: &str,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.follow_repo.update_follower_status(user_id, actor_url, crate::repository::FollowerStatus::Accepted).await.map_err(|e| anyhow::anyhow!("{e}")) data.follow_repo
.update_follower_status(
user_id,
actor_url,
crate::repository::FollowerStatus::Accepted,
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))
} }
pub async fn mark_follower_rejected(&self, user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { pub async fn mark_follower_rejected(
&self,
user_id: uuid::Uuid,
actor_url: &str,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.follow_repo.remove_follower(user_id, actor_url).await.map_err(|e| anyhow::anyhow!("{e}")) data.follow_repo
.remove_follower(user_id, actor_url)
.await
.map_err(|e| anyhow::anyhow!("{e}"))
} }
pub async fn lookup_actor_by_handle(&self, handle: &str) -> anyhow::Result<crate::user::LookedUpActor> { pub async fn lookup_actor_by_handle(
&self,
handle: &str,
) -> anyhow::Result<crate::user::LookedUpActor> {
tracing::info!(handle, "looking up remote actor"); tracing::info!(handle, "looking up remote actor");
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let actor = self.webfinger_https(handle, &data).await let actor = self
.webfinger_https(handle, &data)
.await
.inspect_err(|e| tracing::warn!(handle, error = %e, "actor lookup failed"))?; .inspect_err(|e| tracing::warn!(handle, error = %e, "actor lookup failed"))?;
let domain = actor.ap_id.host_str().unwrap_or("").to_string(); let domain = actor.ap_id.host_str().unwrap_or("").to_string();
tracing::info!(handle = format!("{}@{}", actor.username, domain), ap_url = %actor.ap_id, "remote actor resolved"); tracing::info!(handle = format!("{}@{}", actor.username, domain), ap_url = %actor.ap_id, "remote actor resolved");
Ok(crate::user::LookedUpActor { Ok(crate::user::LookedUpActor {
handle: format!("{}@{}", actor.username, domain), handle: format!("{}@{}", actor.username, domain),
display_name: actor.display_name, bio: actor.bio, display_name: actor.display_name,
avatar_url: actor.avatar_url, banner_url: actor.banner_url, bio: actor.bio,
ap_url: actor.ap_id, outbox_url: Some(actor.outbox_url), avatar_url: actor.avatar_url,
followers_url: Some(actor.followers_url), following_url: Some(actor.following_url), banner_url: actor.banner_url,
also_known_as: actor.also_known_as, profile_url: actor.profile_url, ap_url: actor.ap_id,
outbox_url: Some(actor.outbox_url),
followers_url: Some(actor.followers_url),
following_url: Some(actor.following_url),
also_known_as: actor.also_known_as,
profile_url: actor.profile_url,
attachment: actor.attachment, attachment: actor.attachment,
}) })
} }
pub async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> anyhow::Result<()> { pub async fn add_blocked_domain(
&self,
domain: &str,
reason: Option<&str>,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.blocklist_repo.add_blocked_domain(domain, reason).await data.blocklist_repo.add_blocked_domain(domain, reason).await
} }
@@ -269,13 +371,22 @@ impl ActivityPubService {
data: &activitypub_federation::config::Data<FederationData>, data: &activitypub_federation::config::Data<FederationData>,
local_user_id: uuid::Uuid, local_user_id: uuid::Uuid,
) -> anyhow::Result<Option<(DbActor, Vec<Url>)>> { ) -> anyhow::Result<Option<(DbActor, Vec<Url>)>> {
let local_actor = get_local_actor(local_user_id, data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(local_user_id, data)
let inbox_strs = data.follow_repo.get_accepted_follower_inboxes(local_user_id).await?; .await
if inbox_strs.is_empty() { return Ok(None); } .map_err(|e| anyhow::anyhow!("{e}"))?;
let inbox_strs = data
.follow_repo
.get_accepted_follower_inboxes(local_user_id)
.await?;
if inbox_strs.is_empty() {
return Ok(None);
}
let inboxes: Vec<Url> = inbox_strs.into_iter().filter_map(|s| { let inboxes: Vec<Url> = inbox_strs.into_iter().filter_map(|s| {
Url::parse(&s).map_err(|e| tracing::warn!(inbox = %s, error = %e, "skipping unparseable inbox URL")).ok() Url::parse(&s).map_err(|e| tracing::warn!(inbox = %s, error = %e, "skipping unparseable inbox URL")).ok()
}).collect(); }).collect();
if inboxes.is_empty() { return Ok(None); } if inboxes.is_empty() {
return Ok(None);
}
Ok(Some((local_actor, inboxes))) Ok(Some((local_actor, inboxes)))
} }
@@ -285,20 +396,39 @@ impl ActivityPubService {
data: &activitypub_federation::config::Data<FederationData>, data: &activitypub_federation::config::Data<FederationData>,
) -> anyhow::Result<DbActor> { ) -> anyhow::Result<DbActor> {
let normalized = handle.trim_start_matches('@'); let normalized = handle.trim_start_matches('@');
let at = normalized.rfind('@').ok_or_else(|| anyhow::anyhow!("handle must be user@domain"))?; let at = normalized
.rfind('@')
.ok_or_else(|| anyhow::anyhow!("handle must be user@domain"))?;
let (user, domain_str) = (&normalized[..at], &normalized[at + 1..]); let (user, domain_str) = (&normalized[..at], &normalized[at + 1..]);
let wf_url = format!("https://{}/.well-known/webfinger?resource=acct:{}@{}", domain_str, user, domain_str); let wf_url = format!(
"https://{}/.well-known/webfinger?resource=acct:{}@{}",
domain_str, user, domain_str
);
tracing::debug!(handle, wf_url, "resolving webfinger"); tracing::debug!(handle, wf_url, "resolving webfinger");
let wf: serde_json::Value = reqwest::Client::new().get(&wf_url) let wf: serde_json::Value = reqwest::Client::new()
.get(&wf_url)
.header("Accept", "application/jrd+json, application/json") .header("Accept", "application/jrd+json, application/json")
.send().await?.json().await?; .send()
let self_href = wf["links"].as_array() .await?
.and_then(|links| links.iter().find(|l| l["rel"].as_str() == Some("self") && l["type"].as_str() == Some("application/activity+json"))) .json()
.await?;
let self_href = wf["links"]
.as_array()
.and_then(|links| {
links.iter().find(|l| {
l["rel"].as_str() == Some("self")
&& l["type"].as_str() == Some("application/activity+json")
})
})
.and_then(|l| l["href"].as_str()) .and_then(|l| l["href"].as_str())
.ok_or_else(|| anyhow::anyhow!("no self link in WebFinger response"))?.to_owned(); .ok_or_else(|| anyhow::anyhow!("no self link in WebFinger response"))?
.to_owned();
tracing::debug!(handle, self_href, "webfinger resolved, fetching actor"); tracing::debug!(handle, self_href, "webfinger resolved, fetching actor");
let actor: DbActor = activitypub_federation::fetch::object_id::ObjectId::from(url::Url::parse(&self_href)?) let actor: DbActor =
.dereference(data).await.map_err(|e| anyhow::anyhow!("{e}"))?; activitypub_federation::fetch::object_id::ObjectId::from(url::Url::parse(&self_href)?)
.dereference(data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
Ok(actor) Ok(actor)
} }
} }

View File

@@ -1,6 +1,5 @@
// src/tests/integration.rs // src/tests/integration.rs
/// Integration tests with in-memory trait stubs. /// Integration tests with in-memory trait stubs.
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::Arc; use std::sync::Arc;
@@ -12,8 +11,8 @@ use url::Url;
use crate::content::{ApContentReader, ApObjectHandler}; use crate::content::{ApContentReader, ApObjectHandler};
use crate::data::FederationData; use crate::data::FederationData;
use crate::repository::{ use crate::repository::{
ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, FollowRepository,
Follower, FollowerStatus, FollowingStatus, FollowRepository, RemoteActor, Follower, FollowerStatus, FollowingStatus, RemoteActor,
}; };
use crate::user::{ApActorType, ApProfileField, ApUser, ApUserRepository}; use crate::user::{ApActorType, ApProfileField, ApUser, ApUserRepository};
@@ -42,24 +41,98 @@ struct MemFollowRepo;
#[async_trait] #[async_trait]
impl FollowRepository for MemFollowRepo { impl FollowRepository for MemFollowRepo {
async fn add_follower(&self, _: uuid::Uuid, _: &str, _: FollowerStatus, _: &str) -> anyhow::Result<()> { Ok(()) } async fn add_follower(
async fn get_follower_follow_activity_id(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<Option<String>> { Ok(None) } &self,
async fn remove_follower(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } _: uuid::Uuid,
async fn get_followers(&self, _: uuid::Uuid) -> anyhow::Result<Vec<Follower>> { Ok(vec![]) } _: &str,
async fn get_followers_page(&self, _: uuid::Uuid, _: u32, _: usize) -> anyhow::Result<Vec<Follower>> { Ok(vec![]) } _: FollowerStatus,
async fn count_followers(&self, _: uuid::Uuid) -> anyhow::Result<usize> { Ok(0) } _: &str,
async fn update_follower_status(&self, _: uuid::Uuid, _: &str, _: FollowerStatus) -> anyhow::Result<()> { Ok(()) } ) -> anyhow::Result<()> {
async fn get_pending_followers(&self, _: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> { Ok(vec![]) } Ok(())
async fn get_accepted_follower_inboxes(&self, _: uuid::Uuid) -> anyhow::Result<Vec<String>> { Ok(vec![]) } }
async fn add_following(&self, _: uuid::Uuid, _: RemoteActor, _: &str) -> anyhow::Result<()> { Ok(()) } async fn get_follower_follow_activity_id(
async fn get_follow_activity_id(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<Option<String>> { Ok(None) } &self,
async fn remove_following(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } _: uuid::Uuid,
async fn get_following(&self, _: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> { Ok(vec![]) } _: &str,
async fn get_following_page(&self, _: uuid::Uuid, _: u32, _: usize) -> anyhow::Result<Vec<RemoteActor>> { Ok(vec![]) } ) -> anyhow::Result<Option<String>> {
async fn count_following(&self, _: uuid::Uuid) -> anyhow::Result<usize> { Ok(0) } Ok(None)
async fn update_following_status(&self, _: uuid::Uuid, _: &str, _: FollowingStatus) -> anyhow::Result<()> { Ok(()) } }
async fn get_following_outbox_url(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<Option<String>> { Ok(None) } async fn remove_follower(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> {
async fn migrate_follower_actor(&self, _: &str, _: &str) -> anyhow::Result<Vec<uuid::Uuid>> { Ok(vec![]) } Ok(())
}
async fn get_followers(&self, _: uuid::Uuid) -> anyhow::Result<Vec<Follower>> {
Ok(vec![])
}
async fn get_followers_page(
&self,
_: uuid::Uuid,
_: u32,
_: usize,
) -> anyhow::Result<Vec<Follower>> {
Ok(vec![])
}
async fn count_followers(&self, _: uuid::Uuid) -> anyhow::Result<usize> {
Ok(0)
}
async fn update_follower_status(
&self,
_: uuid::Uuid,
_: &str,
_: FollowerStatus,
) -> anyhow::Result<()> {
Ok(())
}
async fn get_pending_followers(&self, _: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> {
Ok(vec![])
}
async fn get_accepted_follower_inboxes(&self, _: uuid::Uuid) -> anyhow::Result<Vec<String>> {
Ok(vec![])
}
async fn add_following(&self, _: uuid::Uuid, _: RemoteActor, _: &str) -> anyhow::Result<()> {
Ok(())
}
async fn get_follow_activity_id(
&self,
_: uuid::Uuid,
_: &str,
) -> anyhow::Result<Option<String>> {
Ok(None)
}
async fn remove_following(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> {
Ok(())
}
async fn get_following(&self, _: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> {
Ok(vec![])
}
async fn get_following_page(
&self,
_: uuid::Uuid,
_: u32,
_: usize,
) -> anyhow::Result<Vec<RemoteActor>> {
Ok(vec![])
}
async fn count_following(&self, _: uuid::Uuid) -> anyhow::Result<usize> {
Ok(0)
}
async fn update_following_status(
&self,
_: uuid::Uuid,
_: &str,
_: FollowingStatus,
) -> anyhow::Result<()> {
Ok(())
}
async fn get_following_outbox_url(
&self,
_: uuid::Uuid,
_: &str,
) -> anyhow::Result<Option<String>> {
Ok(None)
}
async fn migrate_follower_actor(&self, _: &str, _: &str) -> anyhow::Result<Vec<uuid::Uuid>> {
Ok(vec![])
}
} }
// ── ActorRepository ─────────────────────────────────────────────────────────── // ── ActorRepository ───────────────────────────────────────────────────────────
@@ -69,12 +142,38 @@ struct MemActorRepo;
#[async_trait] #[async_trait]
impl ActorRepository for MemActorRepo { impl ActorRepository for MemActorRepo {
async fn get_local_actor_keypair(&self, _: uuid::Uuid) -> anyhow::Result<Option<(String, String)>> { Ok(None) } async fn get_local_actor_keypair(
async fn save_local_actor_keypair(&self, _: uuid::Uuid, _: String, _: String) -> anyhow::Result<()> { Ok(()) } &self,
async fn upsert_remote_actor(&self, _: RemoteActor) -> anyhow::Result<()> { Ok(()) } _: uuid::Uuid,
async fn get_remote_actor(&self, _: &str) -> anyhow::Result<Option<RemoteActor>> { Ok(None) } ) -> anyhow::Result<Option<(String, String)>> {
async fn add_announce(&self, _: &str, _: &str, _: &str, _: DateTime<Utc>) -> anyhow::Result<()> { Ok(()) } Ok(None)
async fn count_announces(&self, _: &str) -> anyhow::Result<usize> { Ok(0) } }
async fn save_local_actor_keypair(
&self,
_: uuid::Uuid,
_: String,
_: String,
) -> anyhow::Result<()> {
Ok(())
}
async fn upsert_remote_actor(&self, _: RemoteActor) -> anyhow::Result<()> {
Ok(())
}
async fn get_remote_actor(&self, _: &str) -> anyhow::Result<Option<RemoteActor>> {
Ok(None)
}
async fn add_announce(
&self,
_: &str,
_: &str,
_: &str,
_: DateTime<Utc>,
) -> anyhow::Result<()> {
Ok(())
}
async fn count_announces(&self, _: &str) -> anyhow::Result<usize> {
Ok(0)
}
} }
// ── BlocklistRepository ─────────────────────────────────────────────────────── // ── BlocklistRepository ───────────────────────────────────────────────────────
@@ -85,13 +184,17 @@ struct MemBlocklistRepo {
impl MemBlocklistRepo { impl MemBlocklistRepo {
fn with_blocked_domains(domains: impl IntoIterator<Item = String>) -> Self { fn with_blocked_domains(domains: impl IntoIterator<Item = String>) -> Self {
Self { blocked_domains: Mutex::new(domains.into_iter().collect()) } Self {
blocked_domains: Mutex::new(domains.into_iter().collect()),
}
} }
} }
impl Default for MemBlocklistRepo { impl Default for MemBlocklistRepo {
fn default() -> Self { fn default() -> Self {
Self { blocked_domains: Mutex::new(HashSet::new()) } Self {
blocked_domains: Mutex::new(HashSet::new()),
}
} }
} }
@@ -105,14 +208,24 @@ impl BlocklistRepository for MemBlocklistRepo {
self.blocked_domains.lock().await.remove(domain); self.blocked_domains.lock().await.remove(domain);
Ok(()) Ok(())
} }
async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>> { Ok(vec![]) } async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>> {
Ok(vec![])
}
async fn is_domain_blocked(&self, domain: &str) -> anyhow::Result<bool> { async fn is_domain_blocked(&self, domain: &str) -> anyhow::Result<bool> {
Ok(self.blocked_domains.lock().await.contains(domain)) Ok(self.blocked_domains.lock().await.contains(domain))
} }
async fn add_blocked_actor(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } async fn add_blocked_actor(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> {
async fn remove_blocked_actor(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } Ok(())
async fn get_blocked_actors(&self, _: uuid::Uuid) -> anyhow::Result<Vec<String>> { Ok(vec![]) } }
async fn is_actor_blocked(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<bool> { Ok(false) } async fn remove_blocked_actor(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> {
Ok(())
}
async fn get_blocked_actors(&self, _: uuid::Uuid) -> anyhow::Result<Vec<String>> {
Ok(vec![])
}
async fn is_actor_blocked(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<bool> {
Ok(false)
}
} }
// ── ApUserRepository ────────────────────────────────────────────────────────── // ── ApUserRepository ──────────────────────────────────────────────────────────
@@ -124,7 +237,9 @@ struct MemUserRepo {
impl MemUserRepo { impl MemUserRepo {
fn with_user(id: uuid::Uuid, username: &str) -> Self { fn with_user(id: uuid::Uuid, username: &str) -> Self {
let mut users = HashMap::new(); let mut users = HashMap::new();
users.insert(id, ApUser { users.insert(
id,
ApUser {
id, id,
username: username.to_string(), username: username.to_string(),
display_name: None, display_name: None,
@@ -138,7 +253,8 @@ impl MemUserRepo {
discoverable: true, discoverable: true,
actor_type: ApActorType::Person, actor_type: ApActorType::Person,
featured_url: None, featured_url: None,
}); },
);
Self { users } Self { users }
} }
} }
@@ -149,9 +265,15 @@ impl ApUserRepository for MemUserRepo {
Ok(self.users.get(&id).cloned()) Ok(self.users.get(&id).cloned())
} }
async fn find_by_username(&self, username: &str) -> anyhow::Result<Option<ApUser>> { async fn find_by_username(&self, username: &str) -> anyhow::Result<Option<ApUser>> {
Ok(self.users.values().find(|u| u.username == username).cloned()) Ok(self
.users
.values()
.find(|u| u.username == username)
.cloned())
}
async fn count_users(&self) -> anyhow::Result<usize> {
Ok(self.users.len())
} }
async fn count_users(&self) -> anyhow::Result<usize> { Ok(self.users.len()) }
} }
// ── ApContentReader ─────────────────────────────────────────────────────────── // ── ApContentReader ───────────────────────────────────────────────────────────
@@ -161,9 +283,23 @@ struct MemContentReader;
#[async_trait] #[async_trait]
impl ApContentReader for MemContentReader { impl ApContentReader for MemContentReader {
async fn get_local_objects_for_user(&self, _: uuid::Uuid) -> anyhow::Result<Vec<(Url, serde_json::Value)>> { Ok(vec![]) } async fn get_local_objects_for_user(
async fn get_local_objects_page(&self, _: uuid::Uuid, _: Option<DateTime<Utc>>, _: usize) -> anyhow::Result<Vec<(Url, serde_json::Value, DateTime<Utc>)>> { Ok(vec![]) } &self,
async fn count_local_posts(&self) -> anyhow::Result<u64> { Ok(0) } _: uuid::Uuid,
) -> anyhow::Result<Vec<(Url, serde_json::Value)>> {
Ok(vec![])
}
async fn get_local_objects_page(
&self,
_: uuid::Uuid,
_: Option<DateTime<Utc>>,
_: usize,
) -> anyhow::Result<Vec<(Url, serde_json::Value, DateTime<Utc>)>> {
Ok(vec![])
}
async fn count_local_posts(&self) -> anyhow::Result<u64> {
Ok(0)
}
} }
// ── ApObjectHandler ─────────────────────────────────────────────────────────── // ── ApObjectHandler ───────────────────────────────────────────────────────────
@@ -180,13 +316,27 @@ impl ApObjectHandler for MemHandler {
self.creates.lock().await.push(ap_id.clone()); self.creates.lock().await.push(ap_id.clone());
Ok(()) Ok(())
} }
async fn on_update(&self, _: &Url, _: &Url, _: serde_json::Value) -> anyhow::Result<()> { Ok(()) } async fn on_update(&self, _: &Url, _: &Url, _: serde_json::Value) -> anyhow::Result<()> {
async fn on_delete(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } Ok(())
async fn on_actor_removed(&self, _: &Url) -> anyhow::Result<()> { Ok(()) } }
async fn on_like(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } async fn on_delete(&self, _: &Url, _: &Url) -> anyhow::Result<()> {
async fn on_unlike(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } Ok(())
async fn on_announce_received(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } }
async fn on_announce_of_remote(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } async fn on_actor_removed(&self, _: &Url) -> anyhow::Result<()> {
Ok(())
}
async fn on_like(&self, _: &Url, _: &Url) -> anyhow::Result<()> {
Ok(())
}
async fn on_unlike(&self, _: &Url, _: &Url) -> anyhow::Result<()> {
Ok(())
}
async fn on_announce_received(&self, _: &Url, _: &Url) -> anyhow::Result<()> {
Ok(())
}
async fn on_announce_of_remote(&self, _: &Url, _: &Url) -> anyhow::Result<()> {
Ok(())
}
async fn on_mention(&self, ap_id: &Url, user_id: uuid::Uuid, _: &Url) -> anyhow::Result<()> { async fn on_mention(&self, ap_id: &Url, user_id: uuid::Uuid, _: &Url) -> anyhow::Result<()> {
self.mentions.lock().await.push((ap_id.clone(), user_id)); self.mentions.lock().await.push((ap_id.clone(), user_id));
Ok(()) Ok(())
@@ -264,9 +414,9 @@ async fn check_guards_blocks_domain() {
use crate::activities::helpers::check_guards; use crate::activities::helpers::check_guards;
use activitypub_federation::config::FederationConfig; use activitypub_federation::config::FederationConfig;
let blocklist_repo = Arc::new(MemBlocklistRepo::with_blocked_domains( let blocklist_repo = Arc::new(MemBlocklistRepo::with_blocked_domains([
["spam.example".to_string()], "spam.example".to_string()
)); ]));
let data_inner = make_data( let data_inner = make_data(
Arc::new(MemActivityRepo::default()), Arc::new(MemActivityRepo::default()),
Arc::new(MemFollowRepo), Arc::new(MemFollowRepo),

View File

@@ -26,7 +26,9 @@ pub enum ApVisibility {
/// Actor type for AP serialization. Defaults to `Person`. /// Actor type for AP serialization. Defaults to `Person`.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Default)]
pub enum ApActorType { pub enum ApActorType {
#[default]
Person, Person,
Service, Service,
Application, Application,
@@ -34,11 +36,6 @@ pub enum ApActorType {
Group, Group,
} }
impl Default for ApActorType {
fn default() -> Self {
Self::Person
}
}
/// Resolved actor data returned by [`crate::service::ActivityPubService::lookup_actor_by_handle`]. /// Resolved actor data returned by [`crate::service::ActivityPubService::lookup_actor_by_handle`].
/// Fetched via a signed HTTP request so strict instances (e.g. Threads) return full data. /// Fetched via a signed HTTP request so strict instances (e.g. Threads) return full data.

View File

@@ -1,7 +1,4 @@
use activitypub_federation::{ use activitypub_federation::{config::Data, fetch::webfinger::extract_webfinger_name};
config::Data,
fetch::webfinger::{extract_webfinger_name},
};
use axum::{ use axum::{
extract::Query, extract::Query,
http::header, http::header,