diff --git a/src/activities/accept.rs b/src/activities/accept.rs index 2f06753..7571809 100644 --- a/src/activities/accept.rs +++ b/src/activities/accept.rs @@ -1,8 +1,5 @@ use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - kinds::activity::AcceptType, - traits::Activity, + config::Data, fetch::object_id::ObjectId, kinds::activity::AcceptType, traits::Activity, }; use serde::{Deserialize, Serialize}; use url::Url; @@ -30,12 +27,18 @@ impl Activity for AcceptActivity { type DataType = FederationData; type Error = Error; - fn id(&self) -> &Url { &self.id } - fn actor(&self) -> &Url { self.actor.inner() } + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + self.actor.inner() + } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { if self.actor.inner() != self.object.object.inner() { - return Err(Error::bad_request(anyhow::anyhow!("Accept actor does not match Follow target"))); + return Err(Error::bad_request(anyhow::anyhow!( + "Accept actor does not match Follow target" + ))); } Ok(()) } diff --git a/src/activities/add.rs b/src/activities/add.rs index 0704e96..008f3f0 100644 --- a/src/activities/add.rs +++ b/src/activities/add.rs @@ -1,8 +1,4 @@ -use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - traits::Activity, -}; +use activitypub_federation::{config::Data, fetch::object_id::ObjectId, traits::Activity}; use serde::{Deserialize, Serialize}; use url::Url; @@ -35,8 +31,12 @@ impl Activity for AddActivity { type DataType = FederationData; type Error = Error; - fn id(&self) -> &Url { &self.id } - fn actor(&self) -> &Url { self.actor.inner() } + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + self.actor.inner() + } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) diff --git a/src/activities/announce.rs b/src/activities/announce.rs index cd199ef..b023ee4 100644 --- a/src/activities/announce.rs +++ b/src/activities/announce.rs @@ -1,7 +1,5 @@ use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - protocol::verification::verify_domains_match, + config::Data, fetch::object_id::ObjectId, protocol::verification::verify_domains_match, traits::Activity, }; use serde::{Deserialize, Serialize}; @@ -37,8 +35,12 @@ impl Activity for AnnounceActivity { type DataType = FederationData; type Error = Error; - fn id(&self) -> &Url { &self.id } - fn actor(&self) -> &Url { self.actor.inner() } + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + self.actor.inner() + } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { verify_domains_match(&self.id, self.actor.inner())?; @@ -53,7 +55,9 @@ impl Activity for AnnounceActivity { data.object_handler .on_announce_of_remote(&self.object, self.actor.inner()) .await - .unwrap_or_else(|e| tracing::warn!(error = %e, "failed to process cross-server announce")); + .unwrap_or_else( + |e| tracing::warn!(error = %e, "failed to process cross-server announce"), + ); tracing::debug!(actor = %self.actor.inner(), object = %self.object, "received Announce of non-local object"); return Ok(()); } @@ -68,7 +72,9 @@ impl Activity for AnnounceActivity { data.object_handler .on_announce_received(&self.object, self.actor.inner()) .await - .unwrap_or_else(|e| tracing::warn!(error = %e, "failed to process announce notification")); + .unwrap_or_else( + |e| tracing::warn!(error = %e, "failed to process announce notification"), + ); tracing::info!(actor = %self.actor.inner(), object = %self.object, "received announce"); Ok(()) } diff --git a/src/activities/block.rs b/src/activities/block.rs index 870441c..e5e990e 100644 --- a/src/activities/block.rs +++ b/src/activities/block.rs @@ -1,7 +1,5 @@ use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - protocol::verification::verify_domains_match, + config::Data, fetch::object_id::ObjectId, protocol::verification::verify_domains_match, traits::Activity, }; use serde::{Deserialize, Serialize}; @@ -32,8 +30,12 @@ impl Activity for BlockActivity { type DataType = FederationData; type Error = Error; - fn id(&self) -> &Url { &self.id } - fn actor(&self) -> &Url { self.actor.inner() } + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + self.actor.inner() + } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { verify_domains_match(&self.id, self.actor.inner())?; @@ -45,8 +47,14 @@ impl Activity for BlockActivity { return Ok(()); } if let Some(local_user_id) = crate::urls::extract_user_id_from_url(&self.object) { - let _ = data.follow_repo.remove_following(local_user_id, self.actor.inner().as_str()).await; - let _ = data.follow_repo.remove_follower(local_user_id, self.actor.inner().as_str()).await; + let _ = data + .follow_repo + .remove_following(local_user_id, self.actor.inner().as_str()) + .await; + let _ = data + .follow_repo + .remove_follower(local_user_id, self.actor.inner().as_str()) + .await; } tracing::info!(actor = %self.actor.inner(), "received block — removed following and follower"); Ok(()) diff --git a/src/activities/create.rs b/src/activities/create.rs index 9fcc132..8a677df 100644 --- a/src/activities/create.rs +++ b/src/activities/create.rs @@ -1,8 +1,5 @@ use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - kinds::activity::CreateType, - traits::Activity, + config::Data, fetch::object_id::ObjectId, kinds::activity::CreateType, traits::Activity, }; use serde::{Deserialize, Serialize}; use url::Url; @@ -36,8 +33,12 @@ impl Activity for CreateActivity { type DataType = FederationData; type Error = Error; - fn id(&self) -> &Url { &self.id } - fn actor(&self) -> &Url { self.actor.inner() } + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + self.actor.inner() + } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) diff --git a/src/activities/delete.rs b/src/activities/delete.rs index 091c8c1..9093dfb 100644 --- a/src/activities/delete.rs +++ b/src/activities/delete.rs @@ -1,8 +1,5 @@ use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - kinds::activity::DeleteType, - traits::Activity, + config::Data, fetch::object_id::ObjectId, kinds::activity::DeleteType, traits::Activity, }; use serde::{Deserialize, Serialize}; use url::Url; @@ -32,8 +29,12 @@ impl Activity for DeleteActivity { type DataType = FederationData; type Error = Error; - fn id(&self) -> &Url { &self.id } - fn actor(&self) -> &Url { self.actor.inner() } + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + self.actor.inner() + } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { let actor_domain = self.actor.inner().host_str().unwrap_or(""); diff --git a/src/activities/follow.rs b/src/activities/follow.rs index e7fd76b..08793a0 100644 --- a/src/activities/follow.rs +++ b/src/activities/follow.rs @@ -1,8 +1,5 @@ use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - kinds::activity::FollowType, - traits::Activity, + config::Data, fetch::object_id::ObjectId, kinds::activity::FollowType, traits::Activity, }; use serde::{Deserialize, Serialize}; use url::Url; @@ -29,26 +26,42 @@ impl Activity for FollowActivity { type DataType = FederationData; type Error = Error; - fn id(&self) -> &Url { &self.id } - fn actor(&self) -> &Url { self.actor.inner() } + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + self.actor.inner() + } async fn verify(&self, data: &Data) -> Result<(), Self::Error> { let target_url = self.object.inner(); let target_domain = match (target_url.host_str(), target_url.port()) { (Some(host), Some(port)) => format!("{}:{}", host, port), (Some(host), None) => host.to_string(), - _ => return Err(Error::bad_request(anyhow::anyhow!("invalid follow target URL"))), + _ => { + return Err(Error::bad_request(anyhow::anyhow!( + "invalid follow target URL" + ))); + } }; if target_domain == data.domain { return Ok(()); } - if let Some(uuid) = crate::urls::extract_user_id_from_url(target_url) { - if data.user_repo.find_by_id(uuid).await.ok().flatten().is_some() { + if let Some(uuid) = crate::urls::extract_user_id_from_url(target_url) + && data + .user_repo + .find_by_id(uuid) + .await + .ok() + .flatten() + .is_some() + { tracing::debug!(target = %target_url, "accepting follow for migrated actor URL"); return Ok(()); } - } - Err(Error::bad_request(anyhow::anyhow!("follow target is not a local actor"))) + Err(Error::bad_request(anyhow::anyhow!( + "follow target is not a local actor" + ))) } async fn receive(self, data: &Data) -> Result<(), Self::Error> { @@ -56,15 +69,15 @@ impl Activity for FollowActivity { return Ok(()); } // Actor block checked BEFORE any outbound HTTP fetch. - if let Some(target_user_id) = crate::urls::extract_user_id_from_url(self.object.inner()) { - if data.blocklist_repo + if let Some(target_user_id) = crate::urls::extract_user_id_from_url(self.object.inner()) + && data + .blocklist_repo .is_actor_blocked(target_user_id, self.actor.inner().as_str()) .await? { tracing::info!(actor = %self.actor.inner(), "ignoring follow from blocked actor"); return Ok(()); } - } let _follower = self.actor.dereference(data).await?; let local_actor = self.object.dereference(data).await?; data.follow_repo diff --git a/src/activities/helpers.rs b/src/activities/helpers.rs index c74bec0..d5b3c3c 100644 --- a/src/activities/helpers.rs +++ b/src/activities/helpers.rs @@ -64,7 +64,9 @@ pub(crate) async fn extract_and_dispatch_mentions( let Some(href) = tag.get("href").and_then(|v| v.as_str()) else { continue; }; - let Ok(href_url) = Url::parse(href) else { continue }; + let Ok(href_url) = Url::parse(href) else { + continue; + }; let Some(mentioned_user_id) = crate::urls::extract_user_id_from_url(&href_url) else { continue; }; diff --git a/src/activities/like.rs b/src/activities/like.rs index 2f47334..a6b47ec 100644 --- a/src/activities/like.rs +++ b/src/activities/like.rs @@ -1,7 +1,5 @@ use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - protocol::verification::verify_domains_match, + config::Data, fetch::object_id::ObjectId, protocol::verification::verify_domains_match, traits::Activity, }; use serde::{Deserialize, Serialize}; @@ -18,7 +16,9 @@ use super::helpers::check_guards; pub struct LikeType; impl Default for LikeType { - fn default() -> Self { Self } + fn default() -> Self { + Self + } } #[derive(Clone, Debug, Deserialize, Serialize)] @@ -36,8 +36,12 @@ impl Activity for LikeActivity { type DataType = FederationData; type Error = Error; - fn id(&self) -> &Url { &self.id } - fn actor(&self) -> &Url { self.actor.inner() } + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + self.actor.inner() + } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { verify_domains_match(&self.id, self.actor.inner())?; diff --git a/src/activities/move_act.rs b/src/activities/move_act.rs index 4aeb37a..3594286 100644 --- a/src/activities/move_act.rs +++ b/src/activities/move_act.rs @@ -1,9 +1,6 @@ use activitypub_federation::{ - activity_sending::SendActivityTask, - config::Data, - fetch::object_id::ObjectId, - protocol::context::WithContext, - traits::Activity, + activity_sending::SendActivityTask, config::Data, fetch::object_id::ObjectId, + protocol::context::WithContext, traits::Activity, }; use serde::{Deserialize, Serialize}; use url::Url; @@ -35,12 +32,18 @@ impl Activity for MoveActivity { type DataType = FederationData; type Error = Error; - fn id(&self) -> &Url { &self.id } - fn actor(&self) -> &Url { self.actor.inner() } + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + self.actor.inner() + } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { if &self.object != self.actor.inner() { - return Err(Error::bad_request(anyhow::anyhow!("Move object must be the actor itself"))); + return Err(Error::bad_request(anyhow::anyhow!( + "Move object must be the actor itself" + ))); } Ok(()) } @@ -67,11 +70,17 @@ impl Activity for MoveActivity { for local_user_id in &affected { let local_actor = match crate::actors::get_local_actor(*local_user_id, data).await { Ok(a) => a, - Err(e) => { tracing::warn!(error = %e, %local_user_id, "Move: failed to load local actor"); continue; } + Err(e) => { + tracing::warn!(error = %e, %local_user_id, "Move: failed to load local actor"); + continue; + } }; let follow_id = match crate::urls::activity_url(&data.base_url) { Ok(u) => u, - Err(e) => { tracing::warn!(error = %e, "Move: failed to generate follow activity URL"); continue; } + Err(e) => { + tracing::warn!(error = %e, "Move: failed to generate follow activity URL"); + continue; + } }; let follow = FollowActivity { id: follow_id, @@ -84,9 +93,14 @@ impl Activity for MoveActivity { &local_actor, vec![target.inbox_url.clone()], data, - ).await { + ) + .await + { Ok(s) => s, - Err(e) => { tracing::warn!(error = %e, "Move: failed to prepare re-follow"); continue; } + Err(e) => { + tracing::warn!(error = %e, "Move: failed to prepare re-follow"); + continue; + } }; for send in sends { if let Err(e) = send.sign_and_send(data).await { diff --git a/src/activities/reject.rs b/src/activities/reject.rs index 3d68892..bae1a3f 100644 --- a/src/activities/reject.rs +++ b/src/activities/reject.rs @@ -1,8 +1,5 @@ use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - kinds::activity::RejectType, - traits::Activity, + config::Data, fetch::object_id::ObjectId, kinds::activity::RejectType, traits::Activity, }; use serde::{Deserialize, Serialize}; use url::Url; @@ -29,12 +26,18 @@ impl Activity for RejectActivity { type DataType = FederationData; type Error = Error; - fn id(&self) -> &Url { &self.id } - fn actor(&self) -> &Url { self.actor.inner() } + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + self.actor.inner() + } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { if self.actor.inner() != self.object.object.inner() { - return Err(Error::bad_request(anyhow::anyhow!("Reject actor does not match Follow target"))); + return Err(Error::bad_request(anyhow::anyhow!( + "Reject actor does not match Follow target" + ))); } Ok(()) } diff --git a/src/activities/undo.rs b/src/activities/undo.rs index 5a60dfc..14bc559 100644 --- a/src/activities/undo.rs +++ b/src/activities/undo.rs @@ -1,8 +1,5 @@ use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - kinds::activity::UndoType, - traits::Activity, + config::Data, fetch::object_id::ObjectId, kinds::activity::UndoType, traits::Activity, }; use serde::{Deserialize, Serialize}; use url::Url; @@ -28,8 +25,12 @@ impl Activity for UndoActivity { type DataType = FederationData; type Error = Error; - fn id(&self) -> &Url { &self.id } - fn actor(&self) -> &Url { self.actor.inner() } + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + self.actor.inner() + } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { if let Some(inner_actor) = self.object.get("actor").and_then(|v| v.as_str()) @@ -46,7 +47,11 @@ impl Activity for UndoActivity { if check_guards(&self.id, self.actor.inner(), data).await? { return Ok(()); } - let obj_type = self.object.get("type").and_then(|t| t.as_str()).unwrap_or(""); + let obj_type = self + .object + .get("type") + .and_then(|t| t.as_str()) + .unwrap_or(""); match obj_type { "Follow" => { if let Some(obj_url) = self.object.get("object").and_then(|o| o.as_str()) diff --git a/src/activities/update.rs b/src/activities/update.rs index c35d1dc..d95c874 100644 --- a/src/activities/update.rs +++ b/src/activities/update.rs @@ -1,8 +1,5 @@ use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - kinds::activity::UpdateType, - traits::Activity, + config::Data, fetch::object_id::ObjectId, kinds::activity::UpdateType, traits::Activity, }; use serde::{Deserialize, Serialize}; use url::Url; @@ -32,8 +29,12 @@ impl Activity for UpdateActivity { type DataType = FederationData; type Error = Error; - fn id(&self) -> &Url { &self.id } - fn actor(&self) -> &Url { self.actor.inner() } + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + self.actor.inner() + } async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { if let Some(attributed_to) = self.object.get("attributedTo").and_then(|v| v.as_str()) diff --git a/src/actors.rs b/src/actors.rs index 04a3515..c9c09b7 100644 --- a/src/actors.rs +++ b/src/actors.rs @@ -140,11 +140,7 @@ pub async fn get_local_actor( .map_err(Error::from)? .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found: {}", user_id)))?; - let (public_key, private_key) = match data - .actor_repo - .get_local_actor_keypair(user_id) - .await? - { + let (public_key, private_key) = match data.actor_repo.get_local_actor_keypair(user_id).await? { Some(kp) => kp, None => { let kp = generate_actor_keypair()?; @@ -230,10 +226,7 @@ impl Object for DbActor { _ => return Ok(None), }; - let keypair = data - .actor_repo - .get_local_actor_keypair(user_id) - .await?; + let keypair = data.actor_repo.get_local_actor_keypair(user_id).await?; let (public_key, private_key) = match keypair { Some(kp) => (kp.0, Some(kp.1)), @@ -377,8 +370,14 @@ impl Object for DbActor { Url::parse(&format!("{}{}", ap_id, suffix)).unwrap_or_else(|_| ap_id.clone()) }; let outbox_url = json.outbox.clone().unwrap_or_else(|| fallback("/outbox")); - let followers_url = json.followers.clone().unwrap_or_else(|| fallback("/followers")); - let following_url = json.following.clone().unwrap_or_else(|| fallback("/following")); + let followers_url = json + .followers + .clone() + .unwrap_or_else(|| fallback("/followers")); + let following_url = json + .following + .clone() + .unwrap_or_else(|| fallback("/following")); Ok(DbActor { user_id, diff --git a/src/content.rs b/src/content.rs index 95949dd..598aace 100644 --- a/src/content.rs +++ b/src/content.rs @@ -51,17 +51,9 @@ pub trait ApObjectHandler: Send + Sync { async fn on_unlike(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; - async fn on_announce_received( - &self, - object_url: &Url, - actor_url: &Url, - ) -> anyhow::Result<()>; + async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; - async fn on_announce_of_remote( - &self, - object_url: &Url, - actor_url: &Url, - ) -> anyhow::Result<()>; + async fn on_announce_of_remote(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; async fn on_mention( &self, diff --git a/src/data.rs b/src/data.rs index 62d7a33..7238271 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,7 +1,9 @@ use std::sync::Arc; use crate::content::{ApContentReader, ApObjectHandler}; -use crate::repository::{ActivityRepository, ActorRepository, BlocklistRepository, FollowRepository}; +use crate::repository::{ + ActivityRepository, ActorRepository, BlocklistRepository, FollowRepository, +}; use crate::user::ApUserRepository; /// Typed event emitted by the federation layer. diff --git a/src/lib.rs b/src/lib.rs index ad88705..c146306 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,18 +15,20 @@ pub(crate) mod urls; pub mod user; pub mod webfinger; -pub use urls::AS_PUBLIC; pub use activitypub_federation::kinds::object::NoteType; pub use content::{ApContentReader, ApObjectHandler}; pub use data::{EventPublisher, FederationData, FederationEvent}; pub use error::Error; pub use federation::ApFederationConfig; pub use repository::{ - ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, - Follower, FollowerStatus, FollowingStatus, FollowRepository, RemoteActor, + ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, FollowRepository, + Follower, FollowerStatus, FollowingStatus, RemoteActor, }; pub use service::ActivityPubService; -pub use user::{ApActorType, ApProfileField, ApUser, ApUserRepository, ApVisibility, LookedUpActor}; +pub use urls::AS_PUBLIC; +pub use user::{ + ApActorType, ApProfileField, ApUser, ApUserRepository, ApVisibility, LookedUpActor, +}; #[cfg(test)] #[path = "tests/integration.rs"] diff --git a/src/repository/blocklist.rs b/src/repository/blocklist.rs index 80ae5c4..fa637b9 100644 --- a/src/repository/blocklist.rs +++ b/src/repository/blocklist.rs @@ -13,23 +13,8 @@ pub trait BlocklistRepository: Send + Sync { async fn is_domain_blocked(&self, domain: &str) -> Result; // ── Per-user actor blocklist ──────────────────────────────────────────── - async fn add_blocked_actor( - &self, - local_user_id: uuid::Uuid, - actor_url: &str, - ) -> Result<()>; - async fn remove_blocked_actor( - &self, - local_user_id: uuid::Uuid, - actor_url: &str, - ) -> Result<()>; - async fn get_blocked_actors( - &self, - local_user_id: uuid::Uuid, - ) -> Result>; - async fn is_actor_blocked( - &self, - local_user_id: uuid::Uuid, - actor_url: &str, - ) -> Result; + async fn add_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; + async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; + async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result>; + async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result; } diff --git a/src/repository/follow.rs b/src/repository/follow.rs index 00ea37b..3169a6e 100644 --- a/src/repository/follow.rs +++ b/src/repository/follow.rs @@ -38,16 +38,11 @@ pub trait FollowRepository: Send + Sync { remote_actor_url: &str, status: FollowerStatus, ) -> Result<()>; - async fn get_pending_followers( - &self, - local_user_id: uuid::Uuid, - ) -> Result>; + async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result>; /// Return deduplicated inbox URLs (shared_inbox preferred) for accepted /// followers, excluding blocked actors/domains. DB-side filtering. - async fn get_accepted_follower_inboxes( - &self, - local_user_id: uuid::Uuid, - ) -> Result>; + async fn get_accepted_follower_inboxes(&self, local_user_id: uuid::Uuid) + -> Result>; // ── Outbound following ────────────────────────────────────────────────── async fn add_following( @@ -61,11 +56,7 @@ pub trait FollowRepository: Send + Sync { local_user_id: uuid::Uuid, remote_actor_url: &str, ) -> Result>; - async fn remove_following( - &self, - local_user_id: uuid::Uuid, - actor_url: &str, - ) -> Result<()>; + async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; async fn get_following(&self, local_user_id: uuid::Uuid) -> Result>; async fn get_following_page( &self, diff --git a/src/service/backfill.rs b/src/service/backfill.rs index abd87f8..114d2e8 100644 --- a/src/service/backfill.rs +++ b/src/service/backfill.rs @@ -1,25 +1,34 @@ -use activitypub_federation::{activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext}; +use activitypub_federation::{ + activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext, +}; use url::Url; -use crate::{ - activities::CreateActivity, - actors::get_local_actor, - federation::ApFederationConfig, -}; +use crate::{activities::CreateActivity, actors::get_local_actor, federation::ApFederationConfig}; use super::{ActivityPubService, delivery::send_with_retry}; impl ActivityPubService { pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> { let client = reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(super::HTTP_FETCH_TIMEOUT_SECS)) + .timeout(std::time::Duration::from_secs( + super::HTTP_FETCH_TIMEOUT_SECS, + )) .build()?; let data = self.federation_config.to_request_data(); let actor = url::Url::parse(actor_url)?; - let root: serde_json::Value = client.get(outbox_url).header("Accept", "application/activity+json").send().await?.json().await?; + let root: serde_json::Value = client + .get(outbox_url) + .header("Accept", "application/activity+json") + .send() + .await? + .json() + .await?; let first = match root.get("first").and_then(|v| v.as_str()) { Some(url) => url.to_string(), - None => { tracing::debug!(outbox = %outbox_url, "outbox has no first page"); return Ok(()); } + None => { + tracing::debug!(outbox = %outbox_url, "outbox has no first page"); + return Ok(()); + } }; let mut current_url = first; let mut visited = std::collections::HashSet::new(); @@ -28,16 +37,40 @@ impl ActivityPubService { tracing::warn!(url = %current_url, "backfill: loop detected, stopping"); break; } - let page: serde_json::Value = match client.get(¤t_url).header("Accept", "application/activity+json").send().await { - Ok(resp) => match resp.json().await { Ok(v) => v, Err(e) => { tracing::error!(error = %e, "backfill: failed to parse page JSON"); break; } }, - Err(e) => { tracing::error!(error = %e, "backfill: HTTP request failed"); break; } + let page: serde_json::Value = match client + .get(¤t_url) + .header("Accept", "application/activity+json") + .send() + .await + { + Ok(resp) => match resp.json().await { + Ok(v) => v, + Err(e) => { + tracing::error!(error = %e, "backfill: failed to parse page JSON"); + break; + } + }, + Err(e) => { + tracing::error!(error = %e, "backfill: HTTP request failed"); + break; + } }; if let Some(items) = page.get("orderedItems").and_then(|v| v.as_array()) { for item in items { let activity_type = item.get("type").and_then(|v| v.as_str()).unwrap_or(""); - if activity_type != "Create" && activity_type != "Add" { continue; } - let Some(object) = item.get("object").filter(|o| o.is_object()).cloned() else { continue }; - let Some(ap_id) = object.get("id").and_then(|v| v.as_str()).and_then(|s| url::Url::parse(s).ok()) else { continue }; + if activity_type != "Create" && activity_type != "Add" { + continue; + } + let Some(object) = item.get("object").filter(|o| o.is_object()).cloned() else { + continue; + }; + let Some(ap_id) = object + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| url::Url::parse(s).ok()) + else { + continue; + }; if let Err(e) = data.object_handler.on_create(&ap_id, &actor, object).await { tracing::warn!(ap_id = %ap_id, error = %e, "backfill: failed to process item"); } @@ -60,7 +93,16 @@ impl ActivityPubService { let max_attempts = self.delivery_max_attempts; let initial_delay = self.delivery_initial_delay_secs; tokio::spawn(async move { - if let Err(e) = ActivityPubService::run_backfill(config, base_url, owner_user_id, follower_inbox_url, max_attempts, initial_delay).await { + if let Err(e) = ActivityPubService::run_backfill( + config, + base_url, + owner_user_id, + follower_inbox_url, + max_attempts, + initial_delay, + ) + .await + { tracing::warn!(error = %e, "backfill: task failed"); } }); @@ -76,7 +118,9 @@ impl ActivityPubService { ) -> anyhow::Result<()> { const BATCH_SIZE: usize = 20; let data = config.to_request_data(); - let local_actor = get_local_actor(owner_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let local_actor = get_local_actor(owner_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; let inbox = Url::parse(&follower_inbox_url)?; // Cursor-based pagination via get_local_objects_page (newest-first). @@ -105,18 +149,27 @@ impl ActivityPubService { uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, ap_id.as_str().as_bytes()) ))?; let create = CreateActivity { - id: create_id, kind: Default::default(), + id: create_id, + kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), - object: object_json.clone(), to: vec![], cc: vec![], bto: vec![], bcc: vec![], + object: object_json.clone(), + to: vec![], + cc: vec![], + bto: vec![], + bcc: vec![], }; let sends = SendActivityTask::prepare( &WithContext::new_default(create), &local_actor, vec![inbox.clone()], &data, - ).await?; + ) + .await?; total += 1; - if send_with_retry(sends, &data, max_attempts, initial_delay).await.is_empty() { + if send_with_retry(sends, &data, max_attempts, initial_delay) + .await + .is_empty() + { success_count += 1; } else { failure_count += 1; @@ -127,7 +180,10 @@ impl ActivityPubService { break; } - tokio::time::sleep(std::time::Duration::from_millis(super::BATCH_FETCH_SLEEP_MS)).await; + tokio::time::sleep(std::time::Duration::from_millis( + super::BATCH_FETCH_SLEEP_MS, + )) + .await; } tracing::info!( diff --git a/src/service/broadcast.rs b/src/service/broadcast.rs index 64b2c5a..efc484c 100644 --- a/src/service/broadcast.rs +++ b/src/service/broadcast.rs @@ -1,10 +1,12 @@ -use activitypub_federation::{fetch::object_id::ObjectId, protocol::context::WithContext, traits::Object}; +use activitypub_federation::{ + fetch::object_id::ObjectId, protocol::context::WithContext, traits::Object, +}; use url::Url; use crate::{ activities::{ - AddActivity, AnnounceActivity, CreateActivity, DeleteActivity, - MoveActivity, UndoActivity, UpdateActivity, + AddActivity, AnnounceActivity, CreateActivity, DeleteActivity, MoveActivity, UndoActivity, + UpdateActivity, }, actors::get_local_actor, urls::activity_url, @@ -22,10 +24,18 @@ impl ActivityPubService { let announce_id = url::Url::parse(&format!( "{}/activities/announce/{}", self.base_url, - uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, format!("{}/{}", local_user_id, object_ap_id).as_bytes()), - )).map_err(|e| anyhow::anyhow!("{e}"))?; + uuid::Uuid::new_v5( + &uuid::Uuid::NAMESPACE_URL, + format!("{}/{}", local_user_id, object_ap_id).as_bytes() + ), + )) + .map_err(|e| anyhow::anyhow!("{e}"))?; let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; + let Some((local_actor, inboxes)) = + self.accepted_follower_inboxes(&data, local_user_id).await? + else { + return Ok(()); + }; let announce = AnnounceActivity { id: announce_id, kind: Default::default(), @@ -35,8 +45,11 @@ impl ActivityPubService { to: vec![crate::urls::AS_PUBLIC.to_string()], cc: vec![local_actor.followers_url.to_string()], }; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, announce).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, inboxes, announce) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } pub async fn broadcast_undo_announce_to_followers( @@ -47,19 +60,30 @@ impl ActivityPubService { let announce_id = url::Url::parse(&format!( "{}/activities/announce/{}", self.base_url, - uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, format!("{}/{}", local_user_id, object_ap_id).as_bytes()), - )).map_err(|e| anyhow::anyhow!("{e}"))?; + uuid::Uuid::new_v5( + &uuid::Uuid::NAMESPACE_URL, + format!("{}/{}", local_user_id, object_ap_id).as_bytes() + ), + )) + .map_err(|e| anyhow::anyhow!("{e}"))?; let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; + let Some((local_actor, inboxes)) = + self.accepted_follower_inboxes(&data, local_user_id).await? + else { + return Ok(()); + }; let undo = UndoActivity { id: undo_id, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: serde_json::json!({"type":"Announce","id":announce_id.to_string(),"actor":local_actor.ap_id.to_string(),"object":object_ap_id.to_string()}), }; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, undo).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, inboxes, undo) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } pub async fn broadcast_like_to_inbox( @@ -69,11 +93,16 @@ impl ActivityPubService { author_inbox_url: url::Url, ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(liker_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let local_actor = get_local_actor(liker_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; let like_id = url::Url::parse(&format!( "{}/activities/like/{}", self.base_url, - uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, format!("{}/{}", liker_user_id, object_ap_id).as_bytes()), + uuid::Uuid::new_v5( + &uuid::Uuid::NAMESPACE_URL, + format!("{}/{}", liker_user_id, object_ap_id).as_bytes() + ), ))?; let like = crate::activities::LikeActivity { id: like_id, @@ -81,8 +110,11 @@ impl ActivityPubService { actor: ObjectId::from(local_actor.ap_id.clone()), object: object_ap_id, }; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![author_inbox_url], like).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, vec![author_inbox_url], like) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } pub async fn broadcast_undo_like_to_inbox( @@ -92,11 +124,16 @@ impl ActivityPubService { author_inbox_url: url::Url, ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(liker_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let local_actor = get_local_actor(liker_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; let like_id = url::Url::parse(&format!( "{}/activities/like/{}", self.base_url, - uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, format!("{}/{}", liker_user_id, object_ap_id).as_bytes()), + uuid::Uuid::new_v5( + &uuid::Uuid::NAMESPACE_URL, + format!("{}/{}", liker_user_id, object_ap_id).as_bytes() + ), ))?; let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; let undo = UndoActivity { @@ -105,8 +142,11 @@ impl ActivityPubService { actor: ObjectId::from(local_actor.ap_id.clone()), object: serde_json::json!({"type":"Like","id":like_id.to_string(),"actor":local_actor.ap_id.to_string(),"object":object_ap_id.to_string()}), }; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![author_inbox_url], undo).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, vec![author_inbox_url], undo) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } pub async fn broadcast_delete_to_followers( @@ -115,7 +155,11 @@ impl ActivityPubService { ap_id: Url, ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; + let Some((local_actor, inboxes)) = + self.accepted_follower_inboxes(&data, local_user_id).await? + else { + return Ok(()); + }; let delete = DeleteActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), @@ -124,8 +168,11 @@ impl ActivityPubService { to: vec![crate::urls::AS_PUBLIC.to_string()], cc: vec![local_actor.followers_url.to_string()], }; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, delete).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, inboxes, delete) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } pub async fn broadcast_add_to_followers( @@ -135,7 +182,11 @@ impl ActivityPubService { object: serde_json::Value, ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; + let Some((local_actor, inboxes)) = + self.accepted_follower_inboxes(&data, local_user_id).await? + else { + return Ok(()); + }; let add = AddActivity { id: ap_id, kind: Default::default(), @@ -144,8 +195,11 @@ impl ActivityPubService { to: vec![crate::urls::AS_PUBLIC.to_string()], cc: vec![local_actor.followers_url.to_string()], }; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, add).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, inboxes, add) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } pub async fn broadcast_undo_add_to_followers( @@ -154,15 +208,22 @@ impl ActivityPubService { watchlist_entry_ap_id: Url, ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; + let Some((local_actor, inboxes)) = + self.accepted_follower_inboxes(&data, local_user_id).await? + else { + return Ok(()); + }; let undo = UndoActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: serde_json::json!({"type":"Add","id":watchlist_entry_ap_id.as_str(),"object":{"id":watchlist_entry_ap_id.as_str()}}), }; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, undo).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, inboxes, undo) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } pub async fn broadcast_create_note( @@ -175,13 +236,18 @@ impl ActivityPubService { return Ok(()); } let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; + let Some((local_actor, inboxes)) = + self.accepted_follower_inboxes(&data, local_user_id).await? + else { + return Ok(()); + }; let note_id_str = note["id"].as_str().unwrap_or(""); let create_id = Url::parse(&format!( "{}/activities/create/{}", self.base_url, uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, note_id_str.as_bytes()) - )).map_err(|e| anyhow::anyhow!("{e}"))?; + )) + .map_err(|e| anyhow::anyhow!("{e}"))?; let (to, cc) = visibility_addressing(visibility, &local_actor.followers_url); let create = CreateActivity { id: create_id, @@ -193,8 +259,11 @@ impl ActivityPubService { bto: vec![], bcc: vec![], }; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, create).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, inboxes, create) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } pub async fn broadcast_update_note( @@ -207,7 +276,11 @@ impl ActivityPubService { return Ok(()); } let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; + let Some((local_actor, inboxes)) = + self.accepted_follower_inboxes(&data, local_user_id).await? + else { + return Ok(()); + }; let (to, cc) = visibility_addressing(visibility, &local_actor.followers_url); let update = crate::activities::UpdateActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, @@ -217,16 +290,30 @@ impl ActivityPubService { to, cc, }; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, update).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, inboxes, update) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } pub async fn broadcast_actor_update(&self, user_id: uuid::Uuid) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; - let person = local_actor.clone().into_json(&data).await.map_err(|e| anyhow::anyhow!("{e}"))?; - let person_json = serde_json::to_value(WithContext::new(person, crate::urls::actor_ap_context()))?; - let update_id = Url::parse(&format!("{}/activities/update/{}", self.base_url, uuid::Uuid::new_v4()))?; + let local_actor = get_local_actor(user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + let person = local_actor + .clone() + .into_json(&data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + let person_json = + serde_json::to_value(WithContext::new(person, crate::urls::actor_ap_context()))?; + let update_id = Url::parse(&format!( + "{}/activities/update/{}", + self.base_url, + uuid::Uuid::new_v4() + ))?; let update = UpdateActivity { id: update_id, kind: Default::default(), @@ -240,8 +327,11 @@ impl ActivityPubService { return Ok(()); }; tracing::info!(%user_id, inbox_count = inboxes.len(), "broadcasting actor update"); - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, update).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, inboxes, update) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } pub async fn broadcast_move( @@ -250,7 +340,9 @@ impl ActivityPubService { new_actor_url: url::Url, ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let local_actor = get_local_actor(user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; let Some((_, inboxes)) = self.accepted_follower_inboxes(&data, user_id).await? else { tracing::info!(%user_id, "broadcast_move: no accepted followers"); return Ok(()); @@ -262,8 +354,11 @@ impl ActivityPubService { object: local_actor.ap_id.clone(), target: new_actor_url.clone(), }; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, move_activity).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, inboxes, move_activity) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await?; tracing::info!(%user_id, target = %new_actor_url, "broadcast_move: dispatched"); Ok(()) } @@ -280,10 +375,7 @@ pub(super) fn visibility_addressing( vec![crate::urls::AS_PUBLIC.to_string()], vec![followers_url.to_string()], ), - ApVisibility::FollowersOnly => ( - vec![followers_url.to_string()], - vec![], - ), + ApVisibility::FollowersOnly => (vec![followers_url.to_string()], vec![]), ApVisibility::Private => (vec![], vec![]), } } diff --git a/src/service/delivery.rs b/src/service/delivery.rs index d40d185..29c5580 100644 --- a/src/service/delivery.rs +++ b/src/service/delivery.rs @@ -57,13 +57,23 @@ impl Activity for RawActivity { type DataType = FederationData; type Error = Error; - fn id(&self) -> &Url { &self.id } - fn actor(&self) -> &Url { &self.actor_url } + fn id(&self) -> &Url { + &self.id + } + fn actor(&self) -> &Url { + &self.actor_url + } - async fn verify(&self, _data: &activitypub_federation::config::Data) -> Result<(), Self::Error> { + async fn verify( + &self, + _data: &activitypub_federation::config::Data, + ) -> Result<(), Self::Error> { Ok(()) } - async fn receive(self, _data: &activitypub_federation::config::Data) -> Result<(), Self::Error> { + async fn receive( + self, + _data: &activitypub_federation::config::Data, + ) -> Result<(), Self::Error> { Ok(()) } } @@ -96,8 +106,7 @@ impl ActivityPubService { let max_attempts = self.delivery_max_attempts; let initial_delay = self.delivery_initial_delay_secs; tokio::spawn(async move { - let failures = - send_with_retry(sends, &data, max_attempts, initial_delay).await; + let failures = send_with_retry(sends, &data, max_attempts, initial_delay).await; if !failures.is_empty() { tracing::warn!(count = failures.len(), "some deliveries failed permanently"); } @@ -128,9 +137,12 @@ impl ActivityPubService { .and_then(|v| v.as_str()) .and_then(|s| Url::parse(s).ok()) .unwrap_or_else(|| actor.ap_id.clone()); - let raw = RawActivity { id, actor_url, value: activity.clone() }; - let sends = - SendActivityTask::prepare(&raw, &actor, vec![inbox.clone()], &data).await?; + let raw = RawActivity { + id, + actor_url, + value: activity.clone(), + }; + let sends = SendActivityTask::prepare(&raw, &actor, vec![inbox.clone()], &data).await?; let failures = send_with_retry( sends, &data, @@ -172,7 +184,8 @@ impl ActivityPubService { where A: Activity + Serialize + Debug + Send + Sync, { - let with_ctx = activitypub_federation::protocol::context::WithContext::new_default(activity); + let with_ctx = + activitypub_federation::protocol::context::WithContext::new_default(activity); let activity_json = serde_json::to_value(&with_ctx)?; let sends = SendActivityTask::prepare(&with_ctx, local_actor, inboxes.clone(), data).await?; diff --git a/src/service/follow.rs b/src/service/follow.rs index e07659a..cd3f15a 100644 --- a/src/service/follow.rs +++ b/src/service/follow.rs @@ -20,112 +20,228 @@ impl ActivityPubService { return self.follow_local(local_user_id, parts[0], &data).await; } let remote_actor = self.webfinger_https(handle, &data).await?; - let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; let follow_id_str = follow_id.to_string(); let remote = RemoteActor { url: remote_actor.ap_id.to_string(), - handle: format!("{}@{}", remote_actor.username, remote_actor.ap_id.host_str().unwrap_or("")), + handle: format!( + "{}@{}", + remote_actor.username, + remote_actor.ap_id.host_str().unwrap_or("") + ), inbox_url: remote_actor.inbox_url.to_string(), - shared_inbox_url: remote_actor.shared_inbox_url.as_ref().map(|u| u.to_string()), + shared_inbox_url: remote_actor + .shared_inbox_url + .as_ref() + .map(|u| u.to_string()), display_name: Some(remote_actor.username.clone()), avatar_url: remote_actor.avatar_url.as_ref().map(|u| u.to_string()), outbox_url: Some(remote_actor.outbox_url.to_string()), }; // Save BEFORE delivering — prevents lost state on process restart. - data.follow_repo.add_following(local_user_id, remote, &follow_id_str).await?; + data.follow_repo + .add_following(local_user_id, remote, &follow_id_str) + .await?; let follow = FollowActivity { id: Url::parse(&follow_id_str)?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: ObjectId::from(remote_actor.ap_id.clone()), }; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![remote_actor.inbox()], follow).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, vec![remote_actor.inbox()], follow) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await } - pub async fn unfollow(&self, local_user_id: uuid::Uuid, actor_url_str: &str) -> anyhow::Result<()> { + pub async fn unfollow( + &self, + local_user_id: uuid::Uuid, + actor_url_str: &str, + ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); if actor_url_str.starts_with(&self.base_url) { - return self.unfollow_local(local_user_id, actor_url_str, &data).await; + return self + .unfollow_local(local_user_id, actor_url_str, &data) + .await; } - let remote = data.actor_repo.get_remote_actor(actor_url_str).await? + let remote = data + .actor_repo + .get_remote_actor(actor_url_str) + .await? .ok_or_else(|| anyhow::anyhow!("remote actor not found: {}", actor_url_str))?; - let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; let remote_ap_id = Url::parse(actor_url_str)?; let inbox = Url::parse(&remote.inbox_url)?; - let follow_id = data.follow_repo.get_follow_activity_id(local_user_id, actor_url_str).await? + let follow_id = data + .follow_repo + .get_follow_activity_id(local_user_id, actor_url_str) + .await? .and_then(|id| Url::parse(&id).ok()) - .unwrap_or_else(|| activity_url(&self.base_url).unwrap_or_else(|_| remote_ap_id.clone())); - let follow = FollowActivity { id: follow_id, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: ObjectId::from(remote_ap_id) }; + .unwrap_or_else(|| { + activity_url(&self.base_url).unwrap_or_else(|_| remote_ap_id.clone()) + }); + let follow = FollowActivity { + id: follow_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: ObjectId::from(remote_ap_id), + }; let undo = UndoActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: serde_json::to_value(&follow).map_err(|e| anyhow::anyhow!("{e}"))?, }; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], undo).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; - data.follow_repo.remove_following(local_user_id, actor_url_str).await?; - data.object_handler.on_actor_removed(&Url::parse(actor_url_str)?).await?; + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, vec![inbox], undo) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await?; + data.follow_repo + .remove_following(local_user_id, actor_url_str) + .await?; + data.object_handler + .on_actor_removed(&Url::parse(actor_url_str)?) + .await?; Ok(()) } - pub async fn accept_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> anyhow::Result<()> { + pub async fn accept_follower( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; - let remote_actor = data.actor_repo.get_remote_actor(remote_actor_url).await? + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + let remote_actor = data + .actor_repo + .get_remote_actor(remote_actor_url) + .await? .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; - let follow_id_str = data.follow_repo.get_follower_follow_activity_id(local_user_id, remote_actor_url).await? - .ok_or_else(|| anyhow::anyhow!("follow activity id not found for {}", remote_actor_url))?; - let follow = FollowActivity { id: Url::parse(&follow_id_str)?, kind: Default::default(), actor: ObjectId::from(Url::parse(remote_actor_url)?), object: ObjectId::from(local_actor.ap_id.clone()) }; - let accept = AcceptActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: follow }; - data.follow_repo.update_follower_status(local_user_id, remote_actor_url, FollowerStatus::Accepted).await?; + let follow_id_str = data + .follow_repo + .get_follower_follow_activity_id(local_user_id, remote_actor_url) + .await? + .ok_or_else(|| { + anyhow::anyhow!("follow activity id not found for {}", remote_actor_url) + })?; + let follow = FollowActivity { + id: Url::parse(&follow_id_str)?, + kind: Default::default(), + actor: ObjectId::from(Url::parse(remote_actor_url)?), + object: ObjectId::from(local_actor.ap_id.clone()), + }; + let accept = AcceptActivity { + id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: follow, + }; + data.follow_repo + .update_follower_status(local_user_id, remote_actor_url, FollowerStatus::Accepted) + .await?; let inbox = Url::parse(&remote_actor.inbox_url)?; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], accept).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; - let target_inbox = remote_actor.shared_inbox_url.clone().unwrap_or_else(|| remote_actor.inbox_url.clone()); + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, vec![inbox], accept) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await?; + let target_inbox = remote_actor + .shared_inbox_url + .clone() + .unwrap_or_else(|| remote_actor.inbox_url.clone()); self.spawn_backfill(local_user_id, target_inbox); Ok(()) } - pub async fn reject_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> anyhow::Result<()> { + pub async fn reject_follower( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; - let remote_actor = data.actor_repo.get_remote_actor(remote_actor_url).await? + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + let remote_actor = data + .actor_repo + .get_remote_actor(remote_actor_url) + .await? .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; - let follow = FollowActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(Url::parse(remote_actor_url)?), object: ObjectId::from(local_actor.ap_id.clone()) }; - let reject = RejectActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: follow }; + let follow = FollowActivity { + id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + kind: Default::default(), + actor: ObjectId::from(Url::parse(remote_actor_url)?), + object: ObjectId::from(local_actor.ap_id.clone()), + }; + let reject = RejectActivity { + id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: follow, + }; let inbox = Url::parse(&remote_actor.inbox_url)?; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], reject).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; - data.follow_repo.remove_follower(local_user_id, remote_actor_url).await?; + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, vec![inbox], reject) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await?; + data.follow_repo + .remove_follower(local_user_id, remote_actor_url) + .await?; Ok(()) } - pub async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { + pub async fn get_pending_followers( + &self, + local_user_id: uuid::Uuid, + ) -> anyhow::Result> { let data = self.federation_config.to_request_data(); data.follow_repo.get_pending_followers(local_user_id).await } - pub async fn get_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { + pub async fn get_accepted_followers( + &self, + local_user_id: uuid::Uuid, + ) -> anyhow::Result> { let data = self.federation_config.to_request_data(); - Ok(data.follow_repo.get_followers(local_user_id).await? + Ok(data + .follow_repo + .get_followers(local_user_id) + .await? .into_iter() .filter(|f| f.status == FollowerStatus::Accepted) .map(|f| f.actor) .collect()) } - pub async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result { + pub async fn count_accepted_followers( + &self, + local_user_id: uuid::Uuid, + ) -> anyhow::Result { let data = self.federation_config.to_request_data(); - Ok(data.follow_repo.get_followers(local_user_id).await? + Ok(data + .follow_repo + .get_followers(local_user_id) + .await? .into_iter() .filter(|f| f.status == FollowerStatus::Accepted) .count()) } - pub async fn get_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { + pub async fn get_following( + &self, + local_user_id: uuid::Uuid, + ) -> anyhow::Result> { let data = self.federation_config.to_request_data(); data.follow_repo.get_following(local_user_id).await } @@ -135,17 +251,37 @@ impl ActivityPubService { data.follow_repo.count_following(local_user_id).await } - pub async fn remove_follower(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { + pub async fn remove_follower( + &self, + local_user_id: uuid::Uuid, + actor_url: &str, + ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - data.follow_repo.remove_follower(local_user_id, actor_url).await + data.follow_repo + .remove_follower(local_user_id, actor_url) + .await } - pub async fn block_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { + pub async fn block_actor( + &self, + local_user_id: uuid::Uuid, + actor_url: &str, + ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - data.blocklist_repo.add_blocked_actor(local_user_id, actor_url).await?; - let _ = data.follow_repo.remove_follower(local_user_id, actor_url).await; - let _ = data.follow_repo.remove_following(local_user_id, actor_url).await; - let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + data.blocklist_repo + .add_blocked_actor(local_user_id, actor_url) + .await?; + let _ = data + .follow_repo + .remove_follower(local_user_id, actor_url) + .await; + let _ = data + .follow_repo + .remove_following(local_user_id, actor_url) + .await; + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; if let Ok(Some(remote_actor)) = data.actor_repo.get_remote_actor(actor_url).await { let block = crate::activities::BlockActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, @@ -154,25 +290,48 @@ impl ActivityPubService { object: Url::parse(actor_url)?, }; let inbox = Url::parse(&remote_actor.inbox_url)?; - let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], block).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; + let (json, sends, inboxes) = self + .prepare_broadcast(&data, &local_actor, vec![inbox], block) + .await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) + .await?; } Ok(()) } - pub async fn unblock_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { + pub async fn unblock_actor( + &self, + local_user_id: uuid::Uuid, + actor_url: &str, + ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - data.blocklist_repo.remove_blocked_actor(local_user_id, actor_url).await + data.blocklist_repo + .remove_blocked_actor(local_user_id, actor_url) + .await } - pub async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { + pub async fn get_blocked_actors( + &self, + local_user_id: uuid::Uuid, + ) -> anyhow::Result> { let data = self.federation_config.to_request_data(); - let actor_urls = data.blocklist_repo.get_blocked_actors(local_user_id).await?; + let actor_urls = data + .blocklist_repo + .get_blocked_actors(local_user_id) + .await?; let mut actors = Vec::new(); for url in actor_urls { let actor = match data.actor_repo.get_remote_actor(&url).await { Ok(Some(a)) => a, - _ => RemoteActor { url: url.clone(), handle: url.clone(), inbox_url: url.clone(), shared_inbox_url: None, display_name: None, avatar_url: None, outbox_url: None }, + _ => RemoteActor { + url: url.clone(), + handle: url.clone(), + inbox_url: url.clone(), + shared_inbox_url: None, + display_name: None, + avatar_url: None, + outbox_url: None, + }, }; actors.push(actor); } @@ -185,15 +344,27 @@ impl ActivityPubService { target_username: &str, data: &activitypub_federation::config::Data, ) -> anyhow::Result<()> { - let target = data.user_repo.find_by_username(target_username).await? + let target = data + .user_repo + .find_by_username(target_username) + .await? .ok_or_else(|| anyhow::anyhow!("user not found: {}", target_username))?; if target.id == local_user_id { return Err(anyhow::anyhow!("cannot follow yourself")); } let follower_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); let target_actor_url = crate::urls::actor_url(&self.base_url, target.id); - let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?.to_string(); - data.follow_repo.add_follower(target.id, &follower_actor_url, FollowerStatus::Accepted, &follow_id).await?; + let follow_id = activity_url(&self.base_url) + .map_err(|e| anyhow::anyhow!("{e}"))? + .to_string(); + data.follow_repo + .add_follower( + target.id, + &follower_actor_url, + FollowerStatus::Accepted, + &follow_id, + ) + .await?; let target_as_remote = RemoteActor { url: target_actor_url.to_string(), handle: format!("{}@{}", target.username, data.domain), @@ -203,8 +374,16 @@ impl ActivityPubService { avatar_url: None, outbox_url: None, }; - data.follow_repo.add_following(local_user_id, target_as_remote, &follow_id).await?; - data.follow_repo.update_following_status(local_user_id, target_actor_url.as_ref(), FollowingStatus::Accepted).await?; + data.follow_repo + .add_following(local_user_id, target_as_remote, &follow_id) + .await?; + data.follow_repo + .update_following_status( + local_user_id, + target_actor_url.as_ref(), + FollowingStatus::Accepted, + ) + .await?; tracing::info!(follower = %local_user_id, followee = %target.id, "local follow"); Ok(()) } @@ -219,8 +398,12 @@ impl ActivityPubService { let target_user_id = crate::urls::extract_user_id_from_url(&target_url) .ok_or_else(|| anyhow::anyhow!("invalid local actor URL: {}", target_actor_url))?; let local_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); - data.follow_repo.remove_follower(target_user_id, &local_actor_url).await?; - data.follow_repo.remove_following(local_user_id, target_actor_url).await?; + data.follow_repo + .remove_follower(target_user_id, &local_actor_url) + .await?; + data.follow_repo + .remove_following(local_user_id, target_actor_url) + .await?; tracing::info!(follower = %local_user_id, followee = %target_user_id, "local unfollow"); Ok(()) } diff --git a/src/service/mod.rs b/src/service/mod.rs index fae319e..606acc5 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,9 +1,6 @@ use std::sync::Arc; -use activitypub_federation::{ - protocol::context::WithContext, - traits::Object, -}; +use activitypub_federation::{protocol::context::WithContext, traits::Object}; use axum::{Router, extract::DefaultBodyLimit, routing::get, routing::post}; use url::Url; @@ -18,10 +15,8 @@ use crate::{ nodeinfo::{nodeinfo_handler, nodeinfo_well_known_handler}, outbox::outbox_handler, repository::{ - ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, - FollowRepository, FollowerStatus, FollowingStatus, RemoteActor, + ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, FollowRepository, }, - urls::activity_url, user::ApUserRepository, webfinger::webfinger_handler, }; @@ -67,54 +62,91 @@ pub struct ActivityPubServiceBuilder { impl ActivityPubServiceBuilder { pub fn activity_repo(mut self, v: Arc) -> Self { - self.activity_repo = Some(v); self + self.activity_repo = Some(v); + self } pub fn follow_repo(mut self, v: Arc) -> Self { - self.follow_repo = Some(v); self + self.follow_repo = Some(v); + self } pub fn actor_repo(mut self, v: Arc) -> Self { - self.actor_repo = Some(v); self + self.actor_repo = Some(v); + self } pub fn blocklist_repo(mut self, v: Arc) -> Self { - self.blocklist_repo = Some(v); self + self.blocklist_repo = Some(v); + self } pub fn user_repo(mut self, v: Arc) -> Self { - self.user_repo = Some(v); self + self.user_repo = Some(v); + self } pub fn content_reader(mut self, v: Arc) -> Self { - self.content_reader = Some(v); self + self.content_reader = Some(v); + self } pub fn object_handler(mut self, v: Arc) -> Self { - self.object_handler = Some(v); self + self.object_handler = Some(v); + self + } + pub fn allow_registration(mut self, v: bool) -> Self { + self.allow_registration = v; + self + } + pub fn software_name(mut self, v: impl Into) -> Self { + self.software_name = v.into(); + self + } + pub fn debug(mut self, v: bool) -> Self { + self.debug = v; + self } - pub fn allow_registration(mut self, v: bool) -> Self { self.allow_registration = v; self } - pub fn software_name(mut self, v: impl Into) -> Self { self.software_name = v.into(); self } - pub fn debug(mut self, v: bool) -> Self { self.debug = v; self } pub fn event_publisher(mut self, v: Arc) -> Self { - self.event_publisher = Some(v); self + self.event_publisher = Some(v); + self + } + pub fn delivery_max_attempts(mut self, v: u32) -> Self { + self.delivery_max_attempts = v; + self + } + pub fn delivery_initial_delay_secs(mut self, v: u64) -> Self { + self.delivery_initial_delay_secs = v; + self } - pub fn delivery_max_attempts(mut self, v: u32) -> Self { self.delivery_max_attempts = v; self } - pub fn delivery_initial_delay_secs(mut self, v: u64) -> Self { self.delivery_initial_delay_secs = v; self } pub async fn build(self) -> anyhow::Result { - let activity_repo = self.activity_repo + let activity_repo = self + .activity_repo .ok_or_else(|| anyhow::anyhow!("activity_repo required — call .activity_repo(arc)"))?; - let follow_repo = self.follow_repo + let follow_repo = self + .follow_repo .ok_or_else(|| anyhow::anyhow!("follow_repo required — call .follow_repo(arc)"))?; - let actor_repo = self.actor_repo + let actor_repo = self + .actor_repo .ok_or_else(|| anyhow::anyhow!("actor_repo required — call .actor_repo(arc)"))?; - let blocklist_repo = self.blocklist_repo - .ok_or_else(|| anyhow::anyhow!("blocklist_repo required — call .blocklist_repo(arc)"))?; - let user_repo = self.user_repo + let blocklist_repo = self.blocklist_repo.ok_or_else(|| { + anyhow::anyhow!("blocklist_repo required — call .blocklist_repo(arc)") + })?; + let user_repo = self + .user_repo .ok_or_else(|| anyhow::anyhow!("user_repo required — call .user_repo(arc)"))?; - let content_reader = self.content_reader - .ok_or_else(|| anyhow::anyhow!("content_reader required — call .content_reader(arc)"))?; - let object_handler = self.object_handler - .ok_or_else(|| anyhow::anyhow!("object_handler required — call .object_handler(arc)"))?; + let content_reader = self.content_reader.ok_or_else(|| { + anyhow::anyhow!("content_reader required — call .content_reader(arc)") + })?; + let object_handler = self.object_handler.ok_or_else(|| { + anyhow::anyhow!("object_handler required — call .object_handler(arc)") + })?; let data = FederationData::new( - activity_repo, follow_repo, actor_repo, blocklist_repo, - user_repo, content_reader, object_handler, - self.base_url.clone(), self.allow_registration, self.software_name, + activity_repo, + follow_repo, + actor_repo, + blocklist_repo, + user_repo, + content_reader, + object_handler, + self.base_url.clone(), + self.allow_registration, + self.software_name, self.event_publisher, ); let federation_config = ApFederationConfig::new(data, self.debug).await?; @@ -147,9 +179,15 @@ impl ActivityPubService { } } - pub fn federation_config(&self) -> &ApFederationConfig { &self.federation_config } - pub fn request_data(&self) -> activitypub_federation::config::Data { self.federation_config.to_request_data() } - pub fn base_url(&self) -> &str { &self.base_url } + pub fn federation_config(&self) -> &ApFederationConfig { + &self.federation_config + } + pub fn request_data(&self) -> activitypub_federation::config::Data { + self.federation_config.to_request_data() + } + pub fn base_url(&self) -> &str { + &self.base_url + } /// Returns the ActivityPub router. Inbox routes enforce a 1 MB body limit. pub fn router(&self) -> Router @@ -160,9 +198,15 @@ impl ActivityPubService { .route("/.well-known/nodeinfo", get(nodeinfo_well_known_handler)) .route("/nodeinfo/2.0", get(nodeinfo_handler)) .route("/.well-known/webfinger", get(webfinger_handler)) - .route("/inbox", post(inbox_handler).layer(DefaultBodyLimit::max(1024 * 1024))) + .route( + "/inbox", + post(inbox_handler).layer(DefaultBodyLimit::max(1024 * 1024)), + ) .route("/users/{id}", get(actor_handler)) - .route("/users/{id}/inbox", post(inbox_handler).layer(DefaultBodyLimit::max(1024 * 1024))) + .route( + "/users/{id}/inbox", + post(inbox_handler).layer(DefaultBodyLimit::max(1024 * 1024)), + ) .route("/users/{id}/outbox", get(outbox_handler)) .route("/users/{id}/followers", get(followers_handler)) .route("/users/{id}/following", get(following_handler)) @@ -172,12 +216,24 @@ impl ActivityPubService { pub async fn actor_json(&self, user_id_str: &str) -> anyhow::Result { let uuid = uuid::Uuid::parse_str(user_id_str)?; let data = self.federation_config.to_request_data(); - let actor = get_local_actor(uuid, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; - let person = actor.into_json(&data).await.map_err(|e| anyhow::anyhow!("{e}"))?; - Ok(serde_json::to_string(&WithContext::new(person, crate::urls::actor_ap_context()))?) + let actor = get_local_actor(uuid, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + let person = actor + .into_json(&data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + Ok(serde_json::to_string(&WithContext::new( + person, + crate::urls::actor_ap_context(), + ))?) } - pub async fn followers_collection_json(&self, user_id: uuid::Uuid, page: Option) -> anyhow::Result { + pub async fn followers_collection_json( + &self, + user_id: uuid::Uuid, + page: Option, + ) -> anyhow::Result { const AP_CONTEXT: &str = "https://www.w3.org/ns/activitystreams"; const PAGE_SIZE: usize = 20; let data = self.federation_config.to_request_data(); @@ -186,11 +242,16 @@ impl ActivityPubService { let obj = if let Some(p) = page { let p = p.max(1); let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE; - let followers = data.follow_repo.get_followers_page(user_id, offset as u32, PAGE_SIZE).await?; + let followers = data + .follow_repo + .get_followers_page(user_id, offset as u32, PAGE_SIZE) + .await?; let has_next = offset + followers.len() < total; let items: Vec = followers.into_iter().map(|f| f.actor.url).collect(); let mut obj = serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollectionPage","id":format!("{}?page={}",collection_id,p),"partOf":collection_id,"totalItems":total,"orderedItems":items}); - if has_next { obj["next"] = serde_json::json!(format!("{}?page={}",collection_id,p+1)); } + if has_next { + obj["next"] = serde_json::json!(format!("{}?page={}", collection_id, p + 1)); + } obj } else { serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollection","id":collection_id,"totalItems":total,"first":format!("{}?page=1",collection_id)}) @@ -198,7 +259,11 @@ impl ActivityPubService { Ok(serde_json::to_string(&obj)?) } - pub async fn following_collection_json(&self, user_id: uuid::Uuid, page: Option) -> anyhow::Result { + pub async fn following_collection_json( + &self, + user_id: uuid::Uuid, + page: Option, + ) -> anyhow::Result { const AP_CONTEXT: &str = "https://www.w3.org/ns/activitystreams"; const PAGE_SIZE: usize = 20; let data = self.federation_config.to_request_data(); @@ -207,11 +272,16 @@ impl ActivityPubService { let obj = if let Some(p) = page { let p = p.max(1); let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE; - let following = data.follow_repo.get_following_page(user_id, offset as u32, PAGE_SIZE).await?; + let following = data + .follow_repo + .get_following_page(user_id, offset as u32, PAGE_SIZE) + .await?; let has_next = offset + following.len() < total; let items: Vec = following.into_iter().map(|a| a.url).collect(); let mut obj = serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollectionPage","id":format!("{}?page={}",collection_id,p),"partOf":collection_id,"totalItems":total,"orderedItems":items}); - if has_next { obj["next"] = serde_json::json!(format!("{}?page={}",collection_id,p+1)); } + if has_next { + obj["next"] = serde_json::json!(format!("{}?page={}", collection_id, p + 1)); + } obj } else { serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollection","id":collection_id,"totalItems":total,"first":format!("{}?page=1",collection_id)}) @@ -219,35 +289,67 @@ impl ActivityPubService { Ok(serde_json::to_string(&obj)?) } - pub async fn mark_follower_accepted(&self, user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { + pub async fn mark_follower_accepted( + &self, + user_id: uuid::Uuid, + actor_url: &str, + ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - data.follow_repo.update_follower_status(user_id, actor_url, crate::repository::FollowerStatus::Accepted).await.map_err(|e| anyhow::anyhow!("{e}")) + data.follow_repo + .update_follower_status( + user_id, + actor_url, + crate::repository::FollowerStatus::Accepted, + ) + .await + .map_err(|e| anyhow::anyhow!("{e}")) } - pub async fn mark_follower_rejected(&self, user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { + pub async fn mark_follower_rejected( + &self, + user_id: uuid::Uuid, + actor_url: &str, + ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - data.follow_repo.remove_follower(user_id, actor_url).await.map_err(|e| anyhow::anyhow!("{e}")) + data.follow_repo + .remove_follower(user_id, actor_url) + .await + .map_err(|e| anyhow::anyhow!("{e}")) } - pub async fn lookup_actor_by_handle(&self, handle: &str) -> anyhow::Result { + pub async fn lookup_actor_by_handle( + &self, + handle: &str, + ) -> anyhow::Result { tracing::info!(handle, "looking up remote actor"); let data = self.federation_config.to_request_data(); - let actor = self.webfinger_https(handle, &data).await + let actor = self + .webfinger_https(handle, &data) + .await .inspect_err(|e| tracing::warn!(handle, error = %e, "actor lookup failed"))?; let domain = actor.ap_id.host_str().unwrap_or("").to_string(); tracing::info!(handle = format!("{}@{}", actor.username, domain), ap_url = %actor.ap_id, "remote actor resolved"); Ok(crate::user::LookedUpActor { handle: format!("{}@{}", actor.username, domain), - display_name: actor.display_name, bio: actor.bio, - avatar_url: actor.avatar_url, banner_url: actor.banner_url, - ap_url: actor.ap_id, outbox_url: Some(actor.outbox_url), - followers_url: Some(actor.followers_url), following_url: Some(actor.following_url), - also_known_as: actor.also_known_as, profile_url: actor.profile_url, + display_name: actor.display_name, + bio: actor.bio, + avatar_url: actor.avatar_url, + banner_url: actor.banner_url, + ap_url: actor.ap_id, + outbox_url: Some(actor.outbox_url), + followers_url: Some(actor.followers_url), + following_url: Some(actor.following_url), + also_known_as: actor.also_known_as, + profile_url: actor.profile_url, attachment: actor.attachment, }) } - pub async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> anyhow::Result<()> { + pub async fn add_blocked_domain( + &self, + domain: &str, + reason: Option<&str>, + ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); data.blocklist_repo.add_blocked_domain(domain, reason).await } @@ -269,13 +371,22 @@ impl ActivityPubService { data: &activitypub_federation::config::Data, local_user_id: uuid::Uuid, ) -> anyhow::Result)>> { - let local_actor = get_local_actor(local_user_id, data).await.map_err(|e| anyhow::anyhow!("{e}"))?; - let inbox_strs = data.follow_repo.get_accepted_follower_inboxes(local_user_id).await?; - if inbox_strs.is_empty() { return Ok(None); } + let local_actor = get_local_actor(local_user_id, data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + let inbox_strs = data + .follow_repo + .get_accepted_follower_inboxes(local_user_id) + .await?; + if inbox_strs.is_empty() { + return Ok(None); + } let inboxes: Vec = inbox_strs.into_iter().filter_map(|s| { Url::parse(&s).map_err(|e| tracing::warn!(inbox = %s, error = %e, "skipping unparseable inbox URL")).ok() }).collect(); - if inboxes.is_empty() { return Ok(None); } + if inboxes.is_empty() { + return Ok(None); + } Ok(Some((local_actor, inboxes))) } @@ -285,20 +396,39 @@ impl ActivityPubService { data: &activitypub_federation::config::Data, ) -> anyhow::Result { let normalized = handle.trim_start_matches('@'); - let at = normalized.rfind('@').ok_or_else(|| anyhow::anyhow!("handle must be user@domain"))?; + let at = normalized + .rfind('@') + .ok_or_else(|| anyhow::anyhow!("handle must be user@domain"))?; let (user, domain_str) = (&normalized[..at], &normalized[at + 1..]); - let wf_url = format!("https://{}/.well-known/webfinger?resource=acct:{}@{}", domain_str, user, domain_str); + let wf_url = format!( + "https://{}/.well-known/webfinger?resource=acct:{}@{}", + domain_str, user, domain_str + ); tracing::debug!(handle, wf_url, "resolving webfinger"); - let wf: serde_json::Value = reqwest::Client::new().get(&wf_url) + let wf: serde_json::Value = reqwest::Client::new() + .get(&wf_url) .header("Accept", "application/jrd+json, application/json") - .send().await?.json().await?; - let self_href = wf["links"].as_array() - .and_then(|links| links.iter().find(|l| l["rel"].as_str() == Some("self") && l["type"].as_str() == Some("application/activity+json"))) + .send() + .await? + .json() + .await?; + let self_href = wf["links"] + .as_array() + .and_then(|links| { + links.iter().find(|l| { + l["rel"].as_str() == Some("self") + && l["type"].as_str() == Some("application/activity+json") + }) + }) .and_then(|l| l["href"].as_str()) - .ok_or_else(|| anyhow::anyhow!("no self link in WebFinger response"))?.to_owned(); + .ok_or_else(|| anyhow::anyhow!("no self link in WebFinger response"))? + .to_owned(); tracing::debug!(handle, self_href, "webfinger resolved, fetching actor"); - let actor: DbActor = activitypub_federation::fetch::object_id::ObjectId::from(url::Url::parse(&self_href)?) - .dereference(data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let actor: DbActor = + activitypub_federation::fetch::object_id::ObjectId::from(url::Url::parse(&self_href)?) + .dereference(data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; Ok(actor) } } diff --git a/src/tests/integration.rs b/src/tests/integration.rs index fbd55e0..5711bee 100644 --- a/src/tests/integration.rs +++ b/src/tests/integration.rs @@ -1,6 +1,5 @@ // src/tests/integration.rs /// Integration tests with in-memory trait stubs. - use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -12,8 +11,8 @@ use url::Url; use crate::content::{ApContentReader, ApObjectHandler}; use crate::data::FederationData; use crate::repository::{ - ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, - Follower, FollowerStatus, FollowingStatus, FollowRepository, RemoteActor, + ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, FollowRepository, + Follower, FollowerStatus, FollowingStatus, RemoteActor, }; use crate::user::{ApActorType, ApProfileField, ApUser, ApUserRepository}; @@ -42,24 +41,98 @@ struct MemFollowRepo; #[async_trait] impl FollowRepository for MemFollowRepo { - async fn add_follower(&self, _: uuid::Uuid, _: &str, _: FollowerStatus, _: &str) -> anyhow::Result<()> { Ok(()) } - async fn get_follower_follow_activity_id(&self, _: uuid::Uuid, _: &str) -> anyhow::Result> { Ok(None) } - async fn remove_follower(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } - async fn get_followers(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } - async fn get_followers_page(&self, _: uuid::Uuid, _: u32, _: usize) -> anyhow::Result> { Ok(vec![]) } - async fn count_followers(&self, _: uuid::Uuid) -> anyhow::Result { Ok(0) } - async fn update_follower_status(&self, _: uuid::Uuid, _: &str, _: FollowerStatus) -> anyhow::Result<()> { Ok(()) } - async fn get_pending_followers(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } - async fn get_accepted_follower_inboxes(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } - async fn add_following(&self, _: uuid::Uuid, _: RemoteActor, _: &str) -> anyhow::Result<()> { Ok(()) } - async fn get_follow_activity_id(&self, _: uuid::Uuid, _: &str) -> anyhow::Result> { Ok(None) } - async fn remove_following(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } - async fn get_following(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } - async fn get_following_page(&self, _: uuid::Uuid, _: u32, _: usize) -> anyhow::Result> { Ok(vec![]) } - async fn count_following(&self, _: uuid::Uuid) -> anyhow::Result { Ok(0) } - async fn update_following_status(&self, _: uuid::Uuid, _: &str, _: FollowingStatus) -> anyhow::Result<()> { Ok(()) } - async fn get_following_outbox_url(&self, _: uuid::Uuid, _: &str) -> anyhow::Result> { Ok(None) } - async fn migrate_follower_actor(&self, _: &str, _: &str) -> anyhow::Result> { Ok(vec![]) } + async fn add_follower( + &self, + _: uuid::Uuid, + _: &str, + _: FollowerStatus, + _: &str, + ) -> anyhow::Result<()> { + Ok(()) + } + async fn get_follower_follow_activity_id( + &self, + _: uuid::Uuid, + _: &str, + ) -> anyhow::Result> { + Ok(None) + } + async fn remove_follower(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { + Ok(()) + } + async fn get_followers(&self, _: uuid::Uuid) -> anyhow::Result> { + Ok(vec![]) + } + async fn get_followers_page( + &self, + _: uuid::Uuid, + _: u32, + _: usize, + ) -> anyhow::Result> { + Ok(vec![]) + } + async fn count_followers(&self, _: uuid::Uuid) -> anyhow::Result { + Ok(0) + } + async fn update_follower_status( + &self, + _: uuid::Uuid, + _: &str, + _: FollowerStatus, + ) -> anyhow::Result<()> { + Ok(()) + } + async fn get_pending_followers(&self, _: uuid::Uuid) -> anyhow::Result> { + Ok(vec![]) + } + async fn get_accepted_follower_inboxes(&self, _: uuid::Uuid) -> anyhow::Result> { + Ok(vec![]) + } + async fn add_following(&self, _: uuid::Uuid, _: RemoteActor, _: &str) -> anyhow::Result<()> { + Ok(()) + } + async fn get_follow_activity_id( + &self, + _: uuid::Uuid, + _: &str, + ) -> anyhow::Result> { + Ok(None) + } + async fn remove_following(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { + Ok(()) + } + async fn get_following(&self, _: uuid::Uuid) -> anyhow::Result> { + Ok(vec![]) + } + async fn get_following_page( + &self, + _: uuid::Uuid, + _: u32, + _: usize, + ) -> anyhow::Result> { + Ok(vec![]) + } + async fn count_following(&self, _: uuid::Uuid) -> anyhow::Result { + Ok(0) + } + async fn update_following_status( + &self, + _: uuid::Uuid, + _: &str, + _: FollowingStatus, + ) -> anyhow::Result<()> { + Ok(()) + } + async fn get_following_outbox_url( + &self, + _: uuid::Uuid, + _: &str, + ) -> anyhow::Result> { + Ok(None) + } + async fn migrate_follower_actor(&self, _: &str, _: &str) -> anyhow::Result> { + Ok(vec![]) + } } // ── ActorRepository ─────────────────────────────────────────────────────────── @@ -69,12 +142,38 @@ struct MemActorRepo; #[async_trait] impl ActorRepository for MemActorRepo { - async fn get_local_actor_keypair(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(None) } - async fn save_local_actor_keypair(&self, _: uuid::Uuid, _: String, _: String) -> anyhow::Result<()> { Ok(()) } - async fn upsert_remote_actor(&self, _: RemoteActor) -> anyhow::Result<()> { Ok(()) } - async fn get_remote_actor(&self, _: &str) -> anyhow::Result> { Ok(None) } - async fn add_announce(&self, _: &str, _: &str, _: &str, _: DateTime) -> anyhow::Result<()> { Ok(()) } - async fn count_announces(&self, _: &str) -> anyhow::Result { Ok(0) } + async fn get_local_actor_keypair( + &self, + _: uuid::Uuid, + ) -> anyhow::Result> { + Ok(None) + } + async fn save_local_actor_keypair( + &self, + _: uuid::Uuid, + _: String, + _: String, + ) -> anyhow::Result<()> { + Ok(()) + } + async fn upsert_remote_actor(&self, _: RemoteActor) -> anyhow::Result<()> { + Ok(()) + } + async fn get_remote_actor(&self, _: &str) -> anyhow::Result> { + Ok(None) + } + async fn add_announce( + &self, + _: &str, + _: &str, + _: &str, + _: DateTime, + ) -> anyhow::Result<()> { + Ok(()) + } + async fn count_announces(&self, _: &str) -> anyhow::Result { + Ok(0) + } } // ── BlocklistRepository ─────────────────────────────────────────────────────── @@ -85,13 +184,17 @@ struct MemBlocklistRepo { impl MemBlocklistRepo { fn with_blocked_domains(domains: impl IntoIterator) -> Self { - Self { blocked_domains: Mutex::new(domains.into_iter().collect()) } + Self { + blocked_domains: Mutex::new(domains.into_iter().collect()), + } } } impl Default for MemBlocklistRepo { fn default() -> Self { - Self { blocked_domains: Mutex::new(HashSet::new()) } + Self { + blocked_domains: Mutex::new(HashSet::new()), + } } } @@ -105,14 +208,24 @@ impl BlocklistRepository for MemBlocklistRepo { self.blocked_domains.lock().await.remove(domain); Ok(()) } - async fn get_blocked_domains(&self) -> anyhow::Result> { Ok(vec![]) } + async fn get_blocked_domains(&self) -> anyhow::Result> { + Ok(vec![]) + } async fn is_domain_blocked(&self, domain: &str) -> anyhow::Result { Ok(self.blocked_domains.lock().await.contains(domain)) } - async fn add_blocked_actor(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } - async fn remove_blocked_actor(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } - async fn get_blocked_actors(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } - async fn is_actor_blocked(&self, _: uuid::Uuid, _: &str) -> anyhow::Result { Ok(false) } + async fn add_blocked_actor(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { + Ok(()) + } + async fn remove_blocked_actor(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { + Ok(()) + } + async fn get_blocked_actors(&self, _: uuid::Uuid) -> anyhow::Result> { + Ok(vec![]) + } + async fn is_actor_blocked(&self, _: uuid::Uuid, _: &str) -> anyhow::Result { + Ok(false) + } } // ── ApUserRepository ────────────────────────────────────────────────────────── @@ -124,21 +237,24 @@ struct MemUserRepo { impl MemUserRepo { fn with_user(id: uuid::Uuid, username: &str) -> Self { let mut users = HashMap::new(); - users.insert(id, ApUser { + users.insert( id, - username: username.to_string(), - display_name: None, - bio: None, - avatar_url: None, - banner_url: None, - also_known_as: None, - profile_url: None, - attachment: vec![], - manually_approves_followers: true, - discoverable: true, - actor_type: ApActorType::Person, - featured_url: None, - }); + ApUser { + id, + username: username.to_string(), + display_name: None, + bio: None, + avatar_url: None, + banner_url: None, + also_known_as: None, + profile_url: None, + attachment: vec![], + manually_approves_followers: true, + discoverable: true, + actor_type: ApActorType::Person, + featured_url: None, + }, + ); Self { users } } } @@ -149,9 +265,15 @@ impl ApUserRepository for MemUserRepo { Ok(self.users.get(&id).cloned()) } async fn find_by_username(&self, username: &str) -> anyhow::Result> { - Ok(self.users.values().find(|u| u.username == username).cloned()) + Ok(self + .users + .values() + .find(|u| u.username == username) + .cloned()) + } + async fn count_users(&self) -> anyhow::Result { + Ok(self.users.len()) } - async fn count_users(&self) -> anyhow::Result { Ok(self.users.len()) } } // ── ApContentReader ─────────────────────────────────────────────────────────── @@ -161,9 +283,23 @@ struct MemContentReader; #[async_trait] impl ApContentReader for MemContentReader { - async fn get_local_objects_for_user(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } - async fn get_local_objects_page(&self, _: uuid::Uuid, _: Option>, _: usize) -> anyhow::Result)>> { Ok(vec![]) } - async fn count_local_posts(&self) -> anyhow::Result { Ok(0) } + async fn get_local_objects_for_user( + &self, + _: uuid::Uuid, + ) -> anyhow::Result> { + Ok(vec![]) + } + async fn get_local_objects_page( + &self, + _: uuid::Uuid, + _: Option>, + _: usize, + ) -> anyhow::Result)>> { + Ok(vec![]) + } + async fn count_local_posts(&self) -> anyhow::Result { + Ok(0) + } } // ── ApObjectHandler ─────────────────────────────────────────────────────────── @@ -180,13 +316,27 @@ impl ApObjectHandler for MemHandler { self.creates.lock().await.push(ap_id.clone()); Ok(()) } - async fn on_update(&self, _: &Url, _: &Url, _: serde_json::Value) -> anyhow::Result<()> { Ok(()) } - async fn on_delete(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } - async fn on_actor_removed(&self, _: &Url) -> anyhow::Result<()> { Ok(()) } - async fn on_like(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } - async fn on_unlike(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } - async fn on_announce_received(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } - async fn on_announce_of_remote(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } + async fn on_update(&self, _: &Url, _: &Url, _: serde_json::Value) -> anyhow::Result<()> { + Ok(()) + } + async fn on_delete(&self, _: &Url, _: &Url) -> anyhow::Result<()> { + Ok(()) + } + async fn on_actor_removed(&self, _: &Url) -> anyhow::Result<()> { + Ok(()) + } + async fn on_like(&self, _: &Url, _: &Url) -> anyhow::Result<()> { + Ok(()) + } + async fn on_unlike(&self, _: &Url, _: &Url) -> anyhow::Result<()> { + Ok(()) + } + async fn on_announce_received(&self, _: &Url, _: &Url) -> anyhow::Result<()> { + Ok(()) + } + async fn on_announce_of_remote(&self, _: &Url, _: &Url) -> anyhow::Result<()> { + Ok(()) + } async fn on_mention(&self, ap_id: &Url, user_id: uuid::Uuid, _: &Url) -> anyhow::Result<()> { self.mentions.lock().await.push((ap_id.clone(), user_id)); Ok(()) @@ -264,9 +414,9 @@ async fn check_guards_blocks_domain() { use crate::activities::helpers::check_guards; use activitypub_federation::config::FederationConfig; - let blocklist_repo = Arc::new(MemBlocklistRepo::with_blocked_domains( - ["spam.example".to_string()], - )); + let blocklist_repo = Arc::new(MemBlocklistRepo::with_blocked_domains([ + "spam.example".to_string() + ])); let data_inner = make_data( Arc::new(MemActivityRepo::default()), Arc::new(MemFollowRepo), diff --git a/src/user.rs b/src/user.rs index a3f2e2c..8551221 100644 --- a/src/user.rs +++ b/src/user.rs @@ -26,7 +26,9 @@ pub enum ApVisibility { /// Actor type for AP serialization. Defaults to `Person`. #[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Default)] pub enum ApActorType { + #[default] Person, Service, Application, @@ -34,11 +36,6 @@ pub enum ApActorType { Group, } -impl Default for ApActorType { - fn default() -> Self { - Self::Person - } -} /// Resolved actor data returned by [`crate::service::ActivityPubService::lookup_actor_by_handle`]. /// Fetched via a signed HTTP request so strict instances (e.g. Threads) return full data. diff --git a/src/webfinger.rs b/src/webfinger.rs index e3d994f..15b2881 100644 --- a/src/webfinger.rs +++ b/src/webfinger.rs @@ -1,7 +1,4 @@ -use activitypub_federation::{ - config::Data, - fetch::webfinger::{extract_webfinger_name}, -}; +use activitypub_federation::{config::Data, fetch::webfinger::extract_webfinger_name}; use axum::{ extract::Query, http::header,