diff --git a/Cargo.lock b/Cargo.lock index f310611..72b3d9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1368,7 +1368,7 @@ dependencies = [ [[package]] name = "k-ap" -version = "0.2.0" +version = "0.3.0" dependencies = [ "activitypub_federation", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index f265a9e..6b4d497 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "k-ap" -version = "0.2.0" +version = "0.3.0" edition = "2024" description = "Generic ActivityPub protocol layer" license = "MIT" diff --git a/src/activities/accept.rs b/src/activities/accept.rs index 1d3b9d5..2f06753 100644 --- a/src/activities/accept.rs +++ b/src/activities/accept.rs @@ -46,7 +46,7 @@ impl Activity for AcceptActivity { } let local_user_id = crate::urls::extract_user_id_from_url(self.object.actor.inner()) .ok_or_else(|| Error::bad_request(anyhow::anyhow!("invalid actor URL in Follow")))?; - data.federation_repo + data.follow_repo .update_following_status( local_user_id, self.actor.inner().as_str(), diff --git a/src/activities/announce.rs b/src/activities/announce.rs index e822035..cd199ef 100644 --- a/src/activities/announce.rs +++ b/src/activities/announce.rs @@ -57,7 +57,7 @@ impl Activity for AnnounceActivity { tracing::debug!(actor = %self.actor.inner(), object = %self.object, "received Announce of non-local object"); return Ok(()); } - data.federation_repo + data.actor_repo .add_announce( self.id.as_str(), self.object.as_str(), diff --git a/src/activities/block.rs b/src/activities/block.rs index 2fb355e..870441c 100644 --- a/src/activities/block.rs +++ b/src/activities/block.rs @@ -45,8 +45,8 @@ impl Activity for BlockActivity { return Ok(()); } if let Some(local_user_id) = crate::urls::extract_user_id_from_url(&self.object) { - let _ = data.federation_repo.remove_following(local_user_id, self.actor.inner().as_str()).await; - let _ = data.federation_repo.remove_follower(local_user_id, self.actor.inner().as_str()).await; + let _ = data.follow_repo.remove_following(local_user_id, self.actor.inner().as_str()).await; + let _ = data.follow_repo.remove_follower(local_user_id, self.actor.inner().as_str()).await; } tracing::info!(actor = %self.actor.inner(), "received block — removed following and follower"); Ok(()) diff --git a/src/activities/follow.rs b/src/activities/follow.rs index c073b92..e7fd76b 100644 --- a/src/activities/follow.rs +++ b/src/activities/follow.rs @@ -57,7 +57,7 @@ impl Activity for FollowActivity { } // Actor block checked BEFORE any outbound HTTP fetch. if let Some(target_user_id) = crate::urls::extract_user_id_from_url(self.object.inner()) { - if data.federation_repo + if data.blocklist_repo .is_actor_blocked(target_user_id, self.actor.inner().as_str()) .await? { @@ -67,7 +67,7 @@ impl Activity for FollowActivity { } let _follower = self.actor.dereference(data).await?; let local_actor = self.object.dereference(data).await?; - data.federation_repo + data.follow_repo .add_follower( local_actor.user_id, self.actor.inner().as_str(), diff --git a/src/activities/helpers.rs b/src/activities/helpers.rs index 8347be5..c74bec0 100644 --- a/src/activities/helpers.rs +++ b/src/activities/helpers.rs @@ -9,13 +9,13 @@ use crate::error::Error; /// On repo error, skips the check rather than silently dropping the activity. pub(crate) async fn already_processed(activity_id: &Url, data: &Data) -> bool { let id = activity_id.as_str(); - match data.federation_repo.is_activity_processed(id).await { + match data.activity_repo.is_activity_processed(id).await { Ok(true) => { tracing::debug!(activity_id = id, "duplicate activity, skipping"); true } Ok(false) => { - if let Err(e) = data.federation_repo.mark_activity_processed(id).await { + if let Err(e) = data.activity_repo.mark_activity_processed(id).await { tracing::warn!(activity_id = id, error = %e, "failed to mark activity processed"); } false @@ -39,7 +39,7 @@ pub(crate) async fn check_guards( return Ok(true); } let domain = actor.host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { + if data.blocklist_repo.is_domain_blocked(domain).await? { tracing::info!(actor = %actor, "ignoring activity from blocked domain"); return Ok(true); } diff --git a/src/activities/move_act.rs b/src/activities/move_act.rs index 29ca4fd..4aeb37a 100644 --- a/src/activities/move_act.rs +++ b/src/activities/move_act.rs @@ -59,7 +59,7 @@ impl Activity for MoveActivity { ))); } let affected = data - .federation_repo + .follow_repo .migrate_follower_actor(self.object.as_str(), self.target.as_str()) .await .map_err(|e| Error::from(anyhow::anyhow!("{e}")))?; diff --git a/src/activities/reject.rs b/src/activities/reject.rs index 7c49f20..3d68892 100644 --- a/src/activities/reject.rs +++ b/src/activities/reject.rs @@ -44,7 +44,7 @@ impl Activity for RejectActivity { return Ok(()); } if let Some(user_id) = crate::urls::extract_user_id_from_url(self.object.actor.inner()) { - data.federation_repo + data.follow_repo .remove_following(user_id, self.actor.inner().as_str()) .await?; } diff --git a/src/activities/undo.rs b/src/activities/undo.rs index fd4ff9c..5a60dfc 100644 --- a/src/activities/undo.rs +++ b/src/activities/undo.rs @@ -53,7 +53,7 @@ impl Activity for UndoActivity { && let Ok(url) = Url::parse(obj_url) && let Some(user_id) = crate::urls::extract_user_id_from_url(&url) { - data.federation_repo + data.follow_repo .remove_follower(user_id, self.actor.inner().as_str()) .await?; } diff --git a/src/actors.rs b/src/actors.rs index 56abfa7..04a3515 100644 --- a/src/actors.rs +++ b/src/actors.rs @@ -141,7 +141,7 @@ pub async fn get_local_actor( .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found: {}", user_id)))?; let (public_key, private_key) = match data - .federation_repo + .actor_repo .get_local_actor_keypair(user_id) .await? { @@ -151,7 +151,7 @@ pub async fn get_local_actor( // Zeroize the private key after storing it so the plaintext doesn't // linger in memory beyond this scope. let private_zeroized = Zeroizing::new(kp.private_key.clone()); - data.federation_repo + data.actor_repo .save_local_actor_keypair( user_id, kp.public_key.clone(), @@ -231,7 +231,7 @@ impl Object for DbActor { }; let keypair = data - .federation_repo + .actor_repo .get_local_actor_keypair(user_id) .await?; @@ -363,7 +363,7 @@ impl Object for DbActor { avatar_url: json.icon.as_ref().map(|i| i.url.to_string()), outbox_url: json.outbox.as_ref().map(|u| u.to_string()), }; - data.federation_repo.upsert_remote_actor(actor).await?; + data.actor_repo.upsert_remote_actor(actor).await?; let url_str = json.id.inner().to_string(); let user_id = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, url_str.as_bytes()); diff --git a/src/content.rs b/src/content.rs index 589820d..95949dd 100644 --- a/src/content.rs +++ b/src/content.rs @@ -2,17 +2,19 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use url::Url; +/// Read side — the library queries this when sending content outward. +/// Implement on the same struct as [`ApObjectHandler`] if you prefer +/// a single database type. #[async_trait] -pub trait ApObjectHandler: Send + Sync { - /// Returns (ap_id, serialized object) for all local content owned by this user. - /// Used by outbox (count) and backfill (delivery). Must only return locally-authored content. +pub trait ApContentReader: Send + Sync { + /// All locally-authored objects for this user. Used by backfill on accept_follower. async fn get_local_objects_for_user( &self, user_id: uuid::Uuid, ) -> anyhow::Result>; - /// Returns up to `limit` objects ordered newest-first, published before `before`. - /// Returns (ap_id, object_json, published_at). + /// Newest-first page of locally-authored objects, published before `before`. + /// Returns `(ap_id, object_json, published_at)`. Used by the outbox endpoint. async fn get_local_objects_page( &self, user_id: uuid::Uuid, @@ -20,7 +22,13 @@ pub trait ApObjectHandler: Send + Sync { limit: usize, ) -> anyhow::Result)>>; - /// Incoming Create activity — persist remote content. + /// Total locally-authored posts across all users. Used by NodeInfo. + async fn count_local_posts(&self) -> anyhow::Result; +} + +/// Write side — the library calls these when processing inbound AP activities. +#[async_trait] +pub trait ApObjectHandler: Send + Sync { async fn on_create( &self, ap_id: &Url, @@ -28,7 +36,6 @@ pub trait ApObjectHandler: Send + Sync { object: serde_json::Value, ) -> anyhow::Result<()>; - /// Incoming Update activity — update existing remote content. async fn on_update( &self, ap_id: &Url, @@ -36,43 +43,30 @@ pub trait ApObjectHandler: Send + Sync { object: serde_json::Value, ) -> anyhow::Result<()>; - /// Incoming Delete activity — remove specific remote content. async fn on_delete(&self, ap_id: &Url, actor_url: &Url) -> anyhow::Result<()>; - /// Actor unfollowed/was removed — clean up all their remote content. async fn on_actor_removed(&self, actor_url: &Url) -> anyhow::Result<()>; - /// Called when a remote actor likes a local thought. - /// `object_url` is the AP URL of the liked note (e.g. `{base}/thoughts/{uuid}`). - /// `actor_url` is the AP URL of the remote actor who sent the Like. async fn on_like(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; - /// Called when a remote actor boosts (Announce) a local thought. - /// `object_url` is the AP URL of the announced note. - /// `actor_url` is the AP URL of the remote actor who sent the Announce. - async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; - - /// Called when a remote actor removes a Like from a local thought. async fn on_unlike(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; - /// Called when an inbound Note tags a local user with a Mention. - async fn on_mention( + async fn on_announce_received( &self, - thought_ap_id: &Url, - mentioned_user_uuid: uuid::Uuid, + object_url: &Url, actor_url: &Url, ) -> anyhow::Result<()>; - /// Called when a remote actor boosts (Announce) a non-local object. - /// Use this to surface cross-server boosts in followers' feeds. - /// `object_url` is the AP URL of the announced note. - /// `actor_url` is the AP URL of the remote actor who sent the Announce. async fn on_announce_of_remote( &self, object_url: &Url, actor_url: &Url, ) -> anyhow::Result<()>; - /// Total number of locally-authored posts across all users. - async fn count_local_posts(&self) -> anyhow::Result; + async fn on_mention( + &self, + thought_ap_id: &Url, + mentioned_user_uuid: uuid::Uuid, + actor_url: &Url, + ) -> anyhow::Result<()>; } diff --git a/src/data.rs b/src/data.rs index 56d0f3c..62d7a33 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,35 +1,25 @@ use std::sync::Arc; -use crate::content::ApObjectHandler; -use crate::repository::FederationRepository; +use crate::content::{ApContentReader, ApObjectHandler}; +use crate::repository::{ActivityRepository, ActorRepository, BlocklistRepository, FollowRepository}; use crate::user::ApUserRepository; -/// Typed event emitted by the federation layer. Consumers wire in an -/// [`EventPublisher`] to receive these and drive side effects (job queues, -/// webhooks, metrics, etc.). +/// Typed event emitted by the federation layer. /// -/// # Delivery flow -/// -/// When an `EventPublisher` is configured, outbound activities are NOT +/// When an [`EventPublisher`] is configured, outbound activities are NOT /// delivered directly — instead a [`FederationEvent::DeliveryRequested`] event -/// is published for each target inbox. The consumer's job queue should: +/// is published per inbox. The consumer's job queue should: /// 1. Persist the event. -/// 2. Call [`crate::service::ActivityPubService::deliver_to_inbox`] when -/// processing the queue item. +/// 2. Call [`crate::service::ActivityPubService::deliver_to_inbox`] when processing. /// -/// Without a publisher, the library falls back to fire-and-forget -/// `tokio::spawn` delivery (no persistence across restarts). +/// Without a publisher, the library falls back to `tokio::spawn` delivery. #[derive(Debug, Clone)] pub enum FederationEvent { - /// An outbound activity must be delivered to `inbox`. - /// Call `ActivityPubService::deliver_to_inbox(inbox, activity, signing_actor_id)`. DeliveryRequested { inbox: url::Url, activity: serde_json::Value, signing_actor_id: uuid::Uuid, }, - /// Delivery to `inbox` failed permanently after all in-process retries. - /// The consumer may schedule additional retries or alert. DeliveryFailed { inbox: url::Url, activity: serde_json::Value, @@ -38,10 +28,7 @@ pub enum FederationEvent { }, } -/// Receives typed federation events from the library. -/// -/// Implement this trait to bridge federation events into your application's -/// job queue, message broker, or metrics system. +/// Receives typed federation events. #[async_trait::async_trait] pub trait EventPublisher: Send + Sync { async fn publish(&self, event: FederationEvent) -> anyhow::Result<()>; @@ -49,8 +36,12 @@ pub trait EventPublisher: Send + Sync { #[derive(Clone)] pub struct FederationData { - pub(crate) federation_repo: Arc, + pub(crate) activity_repo: Arc, + pub(crate) follow_repo: Arc, + pub(crate) actor_repo: Arc, + pub(crate) blocklist_repo: Arc, pub(crate) user_repo: Arc, + pub(crate) content_reader: Arc, pub(crate) object_handler: Arc, pub(crate) base_url: String, pub(crate) domain: String, @@ -60,9 +51,14 @@ pub struct FederationData { } impl FederationData { + #[allow(clippy::too_many_arguments)] pub fn new( - federation_repo: Arc, + activity_repo: Arc, + follow_repo: Arc, + actor_repo: Arc, + blocklist_repo: Arc, user_repo: Arc, + content_reader: Arc, object_handler: Arc, base_url: String, allow_registration: bool, @@ -77,8 +73,12 @@ impl FederationData { .unwrap_or("") .to_string(); Self { - federation_repo, + activity_repo, + follow_repo, + actor_repo, + blocklist_repo, user_repo, + content_reader, object_handler, base_url, domain, diff --git a/src/followers_handler.rs b/src/followers_handler.rs index e5de463..26afb4b 100644 --- a/src/followers_handler.rs +++ b/src/followers_handler.rs @@ -33,8 +33,8 @@ async fn collection_handler( ); let total = match collection_type { - "followers" => data.federation_repo.count_followers(user_id).await, - _ => data.federation_repo.count_following(user_id).await, + "followers" => data.follow_repo.count_followers(user_id).await, + _ => data.follow_repo.count_following(user_id).await, } .map_err(Error::from)?; @@ -44,7 +44,7 @@ async fn collection_handler( let items: Vec = match collection_type { "followers" => data - .federation_repo + .follow_repo .get_followers_page(user_id, offset as u32, AP_PAGE_SIZE) .await .map_err(Error::from)? @@ -52,7 +52,7 @@ async fn collection_handler( .map(|f| f.actor.url) .collect(), _ => data - .federation_repo + .follow_repo .get_following_page(user_id, offset as u32, AP_PAGE_SIZE) .await .map_err(Error::from)? diff --git a/src/lib.rs b/src/lib.rs index f47fb33..ad88705 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,12 +17,13 @@ pub mod webfinger; pub use urls::AS_PUBLIC; pub use activitypub_federation::kinds::object::NoteType; -pub use content::ApObjectHandler; +pub use content::{ApContentReader, ApObjectHandler}; pub use data::{EventPublisher, FederationData, FederationEvent}; pub use error::Error; pub use federation::ApFederationConfig; pub use repository::{ - BlockedDomain, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, + ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, + Follower, FollowerStatus, FollowingStatus, FollowRepository, RemoteActor, }; pub use service::ActivityPubService; pub use user::{ApActorType, ApProfileField, ApUser, ApUserRepository, ApVisibility, LookedUpActor}; diff --git a/src/nodeinfo.rs b/src/nodeinfo.rs index f619d75..db0d538 100644 --- a/src/nodeinfo.rs +++ b/src/nodeinfo.rs @@ -60,7 +60,7 @@ pub async fn nodeinfo_well_known_handler( pub async fn nodeinfo_handler(data: Data) -> Result, Error> { let user_count = data.user_repo.count_users().await.unwrap_or(0); - let local_posts = data.object_handler.count_local_posts().await.unwrap_or(0); + let local_posts = data.content_reader.count_local_posts().await.unwrap_or(0); Ok(Json(NodeInfo { version: "2.0".to_string(), diff --git a/src/outbox.rs b/src/outbox.rs index a56812a..767a488 100644 --- a/src/outbox.rs +++ b/src/outbox.rs @@ -66,7 +66,7 @@ pub async fn outbox_handler( // if count_local_posts returns 0. In practice this trait method is called // infrequently (only on the root collection endpoint). let total = data - .object_handler + .content_reader .count_local_posts() .await .map_err(|e| Error::from(anyhow::anyhow!("{}", e)))?; @@ -75,7 +75,7 @@ pub async fn outbox_handler( let before: Option> = query.before.as_deref().and_then(|s| s.parse().ok()); let items = data - .object_handler + .content_reader .get_local_objects_page(uuid, before, AP_PAGE_SIZE) .await .map_err(|e| Error::from(anyhow::anyhow!("{}", e)))?; diff --git a/src/repository.rs b/src/repository.rs deleted file mode 100644 index 51b2d5b..0000000 --- a/src/repository.rs +++ /dev/null @@ -1,160 +0,0 @@ -use anyhow::Result; -use async_trait::async_trait; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum FollowerStatus { - Pending, - Accepted, - Rejected, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum FollowingStatus { - Pending, - Accepted, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct RemoteActor { - pub url: String, - pub handle: String, - pub inbox_url: String, - pub shared_inbox_url: Option, - pub display_name: Option, - pub avatar_url: Option, - pub outbox_url: Option, -} - -#[derive(Debug, Clone)] -pub struct Follower { - pub actor: RemoteActor, - pub status: FollowerStatus, -} - -#[derive(Debug, Clone)] -pub struct BlockedDomain { - pub domain: String, - pub reason: Option, - pub blocked_at: String, -} - -#[async_trait] -pub trait FederationRepository: Send + Sync { - async fn add_follower( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - status: FollowerStatus, - follow_activity_id: &str, - ) -> Result<()>; - async fn get_follower_follow_activity_id( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> Result>; - async fn remove_follower( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> Result<()>; - async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result>; - async fn get_followers_page( - &self, - local_user_id: uuid::Uuid, - offset: u32, - limit: usize, - ) -> Result>; - async fn count_followers(&self, local_user_id: uuid::Uuid) -> Result; - async fn get_following_page( - &self, - local_user_id: uuid::Uuid, - offset: u32, - limit: usize, - ) -> Result>; - async fn update_follower_status( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - status: FollowerStatus, - ) -> Result<()>; - async fn add_following( - &self, - local_user_id: uuid::Uuid, - actor: RemoteActor, - follow_activity_id: &str, - ) -> Result<()>; - async fn get_follow_activity_id( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> Result>; - async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; - async fn get_following(&self, local_user_id: uuid::Uuid) -> Result>; - async fn count_following(&self, local_user_id: uuid::Uuid) -> Result; - async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()>; - async fn get_remote_actor(&self, actor_url: &str) -> Result>; - async fn get_local_actor_keypair( - &self, - user_id: uuid::Uuid, - ) -> Result>; - async fn save_local_actor_keypair( - &self, - user_id: uuid::Uuid, - public_key: String, - private_key: String, - ) -> Result<()>; - async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result>; - async fn update_following_status( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - status: FollowingStatus, - ) -> Result<()>; - async fn get_following_outbox_url( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> Result>; - async fn add_announce( - &self, - activity_id: &str, - object_url: &str, - actor_url: &str, - announced_at: chrono::DateTime, - ) -> Result<()>; - async fn count_announces(&self, object_url: &str) -> Result; - async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> Result<()>; - async fn remove_blocked_domain(&self, domain: &str) -> Result<()>; - async fn get_blocked_domains(&self) -> Result>; - async fn is_domain_blocked(&self, domain: &str) -> Result; - async fn add_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; - async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; - async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result>; - async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result; - /// Migrate all local following records from old_actor_url to new_actor_url. - /// Returns the local user IDs whose records were migrated (excludes users - /// already following the new actor — they need no re-follow). - async fn migrate_follower_actor( - &self, - old_actor_url: &str, - new_actor_url: &str, - ) -> Result>; - - /// Return `true` if an activity with `activity_id` has already been processed. - /// Implementations should enforce a UNIQUE constraint on the stored activity IDs - /// so concurrent duplicate deliveries are safely rejected. - async fn is_activity_processed(&self, activity_id: &str) -> Result; - - /// Record `activity_id` as processed. Called immediately before dispatching - /// each inbound activity so that retried deliveries are no-ops. - async fn mark_activity_processed(&self, activity_id: &str) -> Result<()>; - - /// Return deduplicated inbox URLs (shared_inbox preferred over personal inbox) - /// for all **accepted** followers of `local_user_id`, excluding any actors or - /// domains that are blocked. Implementations should perform filtering and - /// deduplication in the database rather than in application memory. - async fn get_accepted_follower_inboxes( - &self, - local_user_id: uuid::Uuid, - ) -> Result>; -} diff --git a/src/repository/activity.rs b/src/repository/activity.rs new file mode 100644 index 0000000..7539bcb --- /dev/null +++ b/src/repository/activity.rs @@ -0,0 +1,11 @@ +use anyhow::Result; +use async_trait::async_trait; + +/// Tracks which inbound AP activity IDs have already been processed. +/// Prevents duplicate handling when remote servers retry delivery. +/// Implementations should enforce a UNIQUE constraint on stored IDs. +#[async_trait] +pub trait ActivityRepository: Send + Sync { + async fn is_activity_processed(&self, activity_id: &str) -> Result; + async fn mark_activity_processed(&self, activity_id: &str) -> Result<()>; +} diff --git a/src/repository/actor.rs b/src/repository/actor.rs new file mode 100644 index 0000000..dfb436e --- /dev/null +++ b/src/repository/actor.rs @@ -0,0 +1,34 @@ +use anyhow::Result; +use async_trait::async_trait; + +use super::RemoteActor; + +/// Manages local actor keypairs, remote actor cache, and Announce tracking. +#[async_trait] +pub trait ActorRepository: Send + Sync { + // ── Local keypairs ────────────────────────────────────────────────────── + async fn get_local_actor_keypair( + &self, + user_id: uuid::Uuid, + ) -> Result>; + async fn save_local_actor_keypair( + &self, + user_id: uuid::Uuid, + public_key: String, + private_key: String, + ) -> Result<()>; + + // ── Remote actor cache ────────────────────────────────────────────────── + async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()>; + async fn get_remote_actor(&self, actor_url: &str) -> Result>; + + // ── Boost (Announce) tracking ─────────────────────────────────────────── + async fn add_announce( + &self, + activity_id: &str, + object_url: &str, + actor_url: &str, + announced_at: chrono::DateTime, + ) -> Result<()>; + async fn count_announces(&self, object_url: &str) -> Result; +} diff --git a/src/repository/blocklist.rs b/src/repository/blocklist.rs new file mode 100644 index 0000000..80ae5c4 --- /dev/null +++ b/src/repository/blocklist.rs @@ -0,0 +1,35 @@ +use anyhow::Result; +use async_trait::async_trait; + +use super::BlockedDomain; + +/// Domain and actor-level blocklists. +#[async_trait] +pub trait BlocklistRepository: Send + Sync { + // ── Domain blocklist ──────────────────────────────────────────────────── + async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> Result<()>; + async fn remove_blocked_domain(&self, domain: &str) -> Result<()>; + async fn get_blocked_domains(&self) -> Result>; + async fn is_domain_blocked(&self, domain: &str) -> Result; + + // ── Per-user actor blocklist ──────────────────────────────────────────── + async fn add_blocked_actor( + &self, + local_user_id: uuid::Uuid, + actor_url: &str, + ) -> Result<()>; + async fn remove_blocked_actor( + &self, + local_user_id: uuid::Uuid, + actor_url: &str, + ) -> Result<()>; + async fn get_blocked_actors( + &self, + local_user_id: uuid::Uuid, + ) -> Result>; + async fn is_actor_blocked( + &self, + local_user_id: uuid::Uuid, + actor_url: &str, + ) -> Result; +} diff --git a/src/repository/follow.rs b/src/repository/follow.rs new file mode 100644 index 0000000..00ea37b --- /dev/null +++ b/src/repository/follow.rs @@ -0,0 +1,97 @@ +use anyhow::Result; +use async_trait::async_trait; + +use super::{Follower, FollowerStatus, FollowingStatus, RemoteActor}; + +/// Manages follower/following relationships and account migration. +#[async_trait] +pub trait FollowRepository: Send + Sync { + // ── Inbound followers ─────────────────────────────────────────────────── + async fn add_follower( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowerStatus, + follow_activity_id: &str, + ) -> Result<()>; + async fn get_follower_follow_activity_id( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result>; + async fn remove_follower( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result<()>; + async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result>; + async fn get_followers_page( + &self, + local_user_id: uuid::Uuid, + offset: u32, + limit: usize, + ) -> Result>; + async fn count_followers(&self, local_user_id: uuid::Uuid) -> Result; + async fn update_follower_status( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowerStatus, + ) -> Result<()>; + async fn get_pending_followers( + &self, + local_user_id: uuid::Uuid, + ) -> Result>; + /// Return deduplicated inbox URLs (shared_inbox preferred) for accepted + /// followers, excluding blocked actors/domains. DB-side filtering. + async fn get_accepted_follower_inboxes( + &self, + local_user_id: uuid::Uuid, + ) -> Result>; + + // ── Outbound following ────────────────────────────────────────────────── + async fn add_following( + &self, + local_user_id: uuid::Uuid, + actor: RemoteActor, + follow_activity_id: &str, + ) -> Result<()>; + async fn get_follow_activity_id( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result>; + async fn remove_following( + &self, + local_user_id: uuid::Uuid, + actor_url: &str, + ) -> Result<()>; + async fn get_following(&self, local_user_id: uuid::Uuid) -> Result>; + async fn get_following_page( + &self, + local_user_id: uuid::Uuid, + offset: u32, + limit: usize, + ) -> Result>; + async fn count_following(&self, local_user_id: uuid::Uuid) -> Result; + async fn update_following_status( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowingStatus, + ) -> Result<()>; + async fn get_following_outbox_url( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result>; + + // ── Account migration ─────────────────────────────────────────────────── + /// Migrate all follower records from `old_actor_url` to `new_actor_url`. + /// Returns local user IDs that need a re-follow sent. + async fn migrate_follower_actor( + &self, + old_actor_url: &str, + new_actor_url: &str, + ) -> Result>; +} diff --git a/src/repository/mod.rs b/src/repository/mod.rs new file mode 100644 index 0000000..cb16689 --- /dev/null +++ b/src/repository/mod.rs @@ -0,0 +1,46 @@ +mod activity; +mod actor; +mod blocklist; +mod follow; + +pub use activity::ActivityRepository; +pub use actor::ActorRepository; +pub use blocklist::BlocklistRepository; +pub use follow::FollowRepository; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FollowerStatus { + Pending, + Accepted, + Rejected, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FollowingStatus { + Pending, + Accepted, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RemoteActor { + pub url: String, + pub handle: String, + pub inbox_url: String, + pub shared_inbox_url: Option, + pub display_name: Option, + pub avatar_url: Option, + pub outbox_url: Option, +} + +#[derive(Debug, Clone)] +pub struct Follower { + pub actor: RemoteActor, + pub status: FollowerStatus, +} + +#[derive(Debug, Clone)] +pub struct BlockedDomain { + pub domain: String, + pub reason: Option, + pub blocked_at: String, +} diff --git a/src/service/backfill.rs b/src/service/backfill.rs index 73539d8..abd87f8 100644 --- a/src/service/backfill.rs +++ b/src/service/backfill.rs @@ -86,7 +86,7 @@ impl ActivityPubService { loop { let page = data - .object_handler + .content_reader .get_local_objects_page(owner_user_id, before, BATCH_SIZE) .await?; diff --git a/src/service/follow.rs b/src/service/follow.rs index 328e578..e07659a 100644 --- a/src/service/follow.rs +++ b/src/service/follow.rs @@ -33,7 +33,7 @@ impl ActivityPubService { outbox_url: Some(remote_actor.outbox_url.to_string()), }; // Save BEFORE delivering — prevents lost state on process restart. - data.federation_repo.add_following(local_user_id, remote, &follow_id_str).await?; + data.follow_repo.add_following(local_user_id, remote, &follow_id_str).await?; let follow = FollowActivity { id: Url::parse(&follow_id_str)?, kind: Default::default(), @@ -49,12 +49,12 @@ impl ActivityPubService { if actor_url_str.starts_with(&self.base_url) { return self.unfollow_local(local_user_id, actor_url_str, &data).await; } - let remote = data.federation_repo.get_remote_actor(actor_url_str).await? + let remote = data.actor_repo.get_remote_actor(actor_url_str).await? .ok_or_else(|| anyhow::anyhow!("remote actor not found: {}", actor_url_str))?; let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let remote_ap_id = Url::parse(actor_url_str)?; let inbox = Url::parse(&remote.inbox_url)?; - let follow_id = data.federation_repo.get_follow_activity_id(local_user_id, actor_url_str).await? + let follow_id = data.follow_repo.get_follow_activity_id(local_user_id, actor_url_str).await? .and_then(|id| Url::parse(&id).ok()) .unwrap_or_else(|| activity_url(&self.base_url).unwrap_or_else(|_| remote_ap_id.clone())); let follow = FollowActivity { id: follow_id, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: ObjectId::from(remote_ap_id) }; @@ -66,7 +66,7 @@ impl ActivityPubService { }; let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], undo).await?; self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; - data.federation_repo.remove_following(local_user_id, actor_url_str).await?; + data.follow_repo.remove_following(local_user_id, actor_url_str).await?; data.object_handler.on_actor_removed(&Url::parse(actor_url_str)?).await?; Ok(()) } @@ -74,13 +74,13 @@ impl ActivityPubService { pub async fn accept_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; - let remote_actor = data.federation_repo.get_remote_actor(remote_actor_url).await? + let remote_actor = data.actor_repo.get_remote_actor(remote_actor_url).await? .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; - let follow_id_str = data.federation_repo.get_follower_follow_activity_id(local_user_id, remote_actor_url).await? + let follow_id_str = data.follow_repo.get_follower_follow_activity_id(local_user_id, remote_actor_url).await? .ok_or_else(|| anyhow::anyhow!("follow activity id not found for {}", remote_actor_url))?; let follow = FollowActivity { id: Url::parse(&follow_id_str)?, kind: Default::default(), actor: ObjectId::from(Url::parse(remote_actor_url)?), object: ObjectId::from(local_actor.ap_id.clone()) }; let accept = AcceptActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: follow }; - data.federation_repo.update_follower_status(local_user_id, remote_actor_url, FollowerStatus::Accepted).await?; + data.follow_repo.update_follower_status(local_user_id, remote_actor_url, FollowerStatus::Accepted).await?; let inbox = Url::parse(&remote_actor.inbox_url)?; let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], accept).await?; self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; @@ -92,25 +92,25 @@ impl ActivityPubService { pub async fn reject_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; - let remote_actor = data.federation_repo.get_remote_actor(remote_actor_url).await? + let remote_actor = data.actor_repo.get_remote_actor(remote_actor_url).await? .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; let follow = FollowActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(Url::parse(remote_actor_url)?), object: ObjectId::from(local_actor.ap_id.clone()) }; let reject = RejectActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: follow }; let inbox = Url::parse(&remote_actor.inbox_url)?; let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], reject).await?; self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; - data.federation_repo.remove_follower(local_user_id, remote_actor_url).await?; + data.follow_repo.remove_follower(local_user_id, remote_actor_url).await?; Ok(()) } pub async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { let data = self.federation_config.to_request_data(); - data.federation_repo.get_pending_followers(local_user_id).await + data.follow_repo.get_pending_followers(local_user_id).await } pub async fn get_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { let data = self.federation_config.to_request_data(); - Ok(data.federation_repo.get_followers(local_user_id).await? + Ok(data.follow_repo.get_followers(local_user_id).await? .into_iter() .filter(|f| f.status == FollowerStatus::Accepted) .map(|f| f.actor) @@ -119,7 +119,7 @@ impl ActivityPubService { pub async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result { let data = self.federation_config.to_request_data(); - Ok(data.federation_repo.get_followers(local_user_id).await? + Ok(data.follow_repo.get_followers(local_user_id).await? .into_iter() .filter(|f| f.status == FollowerStatus::Accepted) .count()) @@ -127,26 +127,26 @@ impl ActivityPubService { pub async fn get_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { let data = self.federation_config.to_request_data(); - data.federation_repo.get_following(local_user_id).await + data.follow_repo.get_following(local_user_id).await } pub async fn count_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result { let data = self.federation_config.to_request_data(); - data.federation_repo.count_following(local_user_id).await + data.follow_repo.count_following(local_user_id).await } pub async fn remove_follower(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - data.federation_repo.remove_follower(local_user_id, actor_url).await + data.follow_repo.remove_follower(local_user_id, actor_url).await } pub async fn block_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - data.federation_repo.add_blocked_actor(local_user_id, actor_url).await?; - let _ = data.federation_repo.remove_follower(local_user_id, actor_url).await; - let _ = data.federation_repo.remove_following(local_user_id, actor_url).await; + data.blocklist_repo.add_blocked_actor(local_user_id, actor_url).await?; + let _ = data.follow_repo.remove_follower(local_user_id, actor_url).await; + let _ = data.follow_repo.remove_following(local_user_id, actor_url).await; let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; - if let Ok(Some(remote_actor)) = data.federation_repo.get_remote_actor(actor_url).await { + if let Ok(Some(remote_actor)) = data.actor_repo.get_remote_actor(actor_url).await { let block = crate::activities::BlockActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), @@ -162,15 +162,15 @@ impl ActivityPubService { pub async fn unblock_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - data.federation_repo.remove_blocked_actor(local_user_id, actor_url).await + data.blocklist_repo.remove_blocked_actor(local_user_id, actor_url).await } pub async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { let data = self.federation_config.to_request_data(); - let actor_urls = data.federation_repo.get_blocked_actors(local_user_id).await?; + let actor_urls = data.blocklist_repo.get_blocked_actors(local_user_id).await?; let mut actors = Vec::new(); for url in actor_urls { - let actor = match data.federation_repo.get_remote_actor(&url).await { + let actor = match data.actor_repo.get_remote_actor(&url).await { Ok(Some(a)) => a, _ => RemoteActor { url: url.clone(), handle: url.clone(), inbox_url: url.clone(), shared_inbox_url: None, display_name: None, avatar_url: None, outbox_url: None }, }; @@ -193,7 +193,7 @@ impl ActivityPubService { let follower_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); let target_actor_url = crate::urls::actor_url(&self.base_url, target.id); let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?.to_string(); - data.federation_repo.add_follower(target.id, &follower_actor_url, FollowerStatus::Accepted, &follow_id).await?; + data.follow_repo.add_follower(target.id, &follower_actor_url, FollowerStatus::Accepted, &follow_id).await?; let target_as_remote = RemoteActor { url: target_actor_url.to_string(), handle: format!("{}@{}", target.username, data.domain), @@ -203,8 +203,8 @@ impl ActivityPubService { avatar_url: None, outbox_url: None, }; - data.federation_repo.add_following(local_user_id, target_as_remote, &follow_id).await?; - data.federation_repo.update_following_status(local_user_id, target_actor_url.as_ref(), FollowingStatus::Accepted).await?; + data.follow_repo.add_following(local_user_id, target_as_remote, &follow_id).await?; + data.follow_repo.update_following_status(local_user_id, target_actor_url.as_ref(), FollowingStatus::Accepted).await?; tracing::info!(follower = %local_user_id, followee = %target.id, "local follow"); Ok(()) } @@ -219,8 +219,8 @@ impl ActivityPubService { let target_user_id = crate::urls::extract_user_id_from_url(&target_url) .ok_or_else(|| anyhow::anyhow!("invalid local actor URL: {}", target_actor_url))?; let local_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); - data.federation_repo.remove_follower(target_user_id, &local_actor_url).await?; - data.federation_repo.remove_following(local_user_id, target_actor_url).await?; + data.follow_repo.remove_follower(target_user_id, &local_actor_url).await?; + data.follow_repo.remove_following(local_user_id, target_actor_url).await?; tracing::info!(follower = %local_user_id, followee = %target_user_id, "local unfollow"); Ok(()) } diff --git a/src/service/mod.rs b/src/service/mod.rs index b073ec1..fae319e 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -10,14 +10,18 @@ use url::Url; use crate::{ actor_handler::actor_handler, actors::{DbActor, get_local_actor}, - content::ApObjectHandler, + content::{ApContentReader, ApObjectHandler}, data::FederationData, federation::ApFederationConfig, followers_handler::{followers_handler, following_handler}, inbox::inbox_handler, nodeinfo::{nodeinfo_handler, nodeinfo_well_known_handler}, outbox::outbox_handler, - repository::{BlockedDomain, FederationRepository}, + repository::{ + ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, + FollowRepository, FollowerStatus, FollowingStatus, RemoteActor, + }, + urls::activity_url, user::ApUserRepository, webfinger::webfinger_handler, }; @@ -45,9 +49,13 @@ pub struct ActivityPubService { } pub struct ActivityPubServiceBuilder { - repo: Arc, - user_repo: Arc, - object_handler: Arc, + activity_repo: Option>, + follow_repo: Option>, + actor_repo: Option>, + blocklist_repo: Option>, + user_repo: Option>, + content_reader: Option>, + object_handler: Option>, base_url: String, allow_registration: bool, software_name: String, @@ -58,19 +66,56 @@ pub struct ActivityPubServiceBuilder { } impl ActivityPubServiceBuilder { + pub fn activity_repo(mut self, v: Arc) -> Self { + self.activity_repo = Some(v); self + } + pub fn follow_repo(mut self, v: Arc) -> Self { + self.follow_repo = Some(v); self + } + pub fn actor_repo(mut self, v: Arc) -> Self { + self.actor_repo = Some(v); self + } + pub fn blocklist_repo(mut self, v: Arc) -> Self { + self.blocklist_repo = Some(v); self + } + pub fn user_repo(mut self, v: Arc) -> Self { + self.user_repo = Some(v); self + } + pub fn content_reader(mut self, v: Arc) -> Self { + self.content_reader = Some(v); self + } + pub fn object_handler(mut self, v: Arc) -> Self { + self.object_handler = Some(v); self + } pub fn allow_registration(mut self, v: bool) -> Self { self.allow_registration = v; self } pub fn software_name(mut self, v: impl Into) -> Self { self.software_name = v.into(); self } pub fn debug(mut self, v: bool) -> Self { self.debug = v; self } - pub fn event_publisher(mut self, v: Arc) -> Self { self.event_publisher = Some(v); self } - /// Override max delivery retries (default: `DELIVERY_MAX_ATTEMPTS`). + pub fn event_publisher(mut self, v: Arc) -> Self { + self.event_publisher = Some(v); self + } pub fn delivery_max_attempts(mut self, v: u32) -> Self { self.delivery_max_attempts = v; self } - /// Override initial retry backoff in seconds (default: `DELIVERY_INITIAL_DELAY_SECS`). pub fn delivery_initial_delay_secs(mut self, v: u64) -> Self { self.delivery_initial_delay_secs = v; self } pub async fn build(self) -> anyhow::Result { + let activity_repo = self.activity_repo + .ok_or_else(|| anyhow::anyhow!("activity_repo required — call .activity_repo(arc)"))?; + let follow_repo = self.follow_repo + .ok_or_else(|| anyhow::anyhow!("follow_repo required — call .follow_repo(arc)"))?; + let actor_repo = self.actor_repo + .ok_or_else(|| anyhow::anyhow!("actor_repo required — call .actor_repo(arc)"))?; + let blocklist_repo = self.blocklist_repo + .ok_or_else(|| anyhow::anyhow!("blocklist_repo required — call .blocklist_repo(arc)"))?; + let user_repo = self.user_repo + .ok_or_else(|| anyhow::anyhow!("user_repo required — call .user_repo(arc)"))?; + let content_reader = self.content_reader + .ok_or_else(|| anyhow::anyhow!("content_reader required — call .content_reader(arc)"))?; + let object_handler = self.object_handler + .ok_or_else(|| anyhow::anyhow!("object_handler required — call .object_handler(arc)"))?; let data = FederationData::new( - self.repo, self.user_repo, self.object_handler, self.base_url.clone(), - self.allow_registration, self.software_name, self.event_publisher, + activity_repo, follow_repo, actor_repo, blocklist_repo, + user_repo, content_reader, object_handler, + self.base_url.clone(), self.allow_registration, self.software_name, + self.event_publisher, ); let federation_config = ApFederationConfig::new(data, self.debug).await?; Ok(ActivityPubService { @@ -83,16 +128,20 @@ impl ActivityPubServiceBuilder { } impl ActivityPubService { - pub fn builder( - repo: Arc, - user_repo: Arc, - object_handler: Arc, - base_url: impl Into, - ) -> ActivityPubServiceBuilder { + pub fn builder(base_url: impl Into) -> ActivityPubServiceBuilder { ActivityPubServiceBuilder { - repo, user_repo, object_handler, base_url: base_url.into(), - allow_registration: false, software_name: String::new(), - debug: false, event_publisher: None, + activity_repo: None, + follow_repo: None, + actor_repo: None, + blocklist_repo: None, + user_repo: None, + content_reader: None, + object_handler: None, + base_url: base_url.into(), + allow_registration: false, + software_name: String::new(), + debug: false, + event_publisher: None, delivery_max_attempts: DELIVERY_MAX_ATTEMPTS, delivery_initial_delay_secs: DELIVERY_INITIAL_DELAY_SECS, } @@ -133,11 +182,11 @@ impl ActivityPubService { const PAGE_SIZE: usize = 20; let data = self.federation_config.to_request_data(); let collection_id = format!("{}/users/{}/followers", self.base_url, user_id); - let total = data.federation_repo.count_followers(user_id).await?; + let total = data.follow_repo.count_followers(user_id).await?; let obj = if let Some(p) = page { let p = p.max(1); let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE; - let followers = data.federation_repo.get_followers_page(user_id, offset as u32, PAGE_SIZE).await?; + let followers = data.follow_repo.get_followers_page(user_id, offset as u32, PAGE_SIZE).await?; let has_next = offset + followers.len() < total; let items: Vec = followers.into_iter().map(|f| f.actor.url).collect(); let mut obj = serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollectionPage","id":format!("{}?page={}",collection_id,p),"partOf":collection_id,"totalItems":total,"orderedItems":items}); @@ -154,11 +203,11 @@ impl ActivityPubService { const PAGE_SIZE: usize = 20; let data = self.federation_config.to_request_data(); let collection_id = format!("{}/users/{}/following", self.base_url, user_id); - let total = data.federation_repo.count_following(user_id).await?; + let total = data.follow_repo.count_following(user_id).await?; let obj = if let Some(p) = page { let p = p.max(1); let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE; - let following = data.federation_repo.get_following_page(user_id, offset as u32, PAGE_SIZE).await?; + let following = data.follow_repo.get_following_page(user_id, offset as u32, PAGE_SIZE).await?; let has_next = offset + following.len() < total; let items: Vec = following.into_iter().map(|a| a.url).collect(); let mut obj = serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollectionPage","id":format!("{}?page={}",collection_id,p),"partOf":collection_id,"totalItems":total,"orderedItems":items}); @@ -172,12 +221,12 @@ impl ActivityPubService { pub async fn mark_follower_accepted(&self, user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - data.federation_repo.update_follower_status(user_id, actor_url, crate::repository::FollowerStatus::Accepted).await.map_err(|e| anyhow::anyhow!("{e}")) + data.follow_repo.update_follower_status(user_id, actor_url, crate::repository::FollowerStatus::Accepted).await.map_err(|e| anyhow::anyhow!("{e}")) } pub async fn mark_follower_rejected(&self, user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - data.federation_repo.remove_follower(user_id, actor_url).await.map_err(|e| anyhow::anyhow!("{e}")) + data.follow_repo.remove_follower(user_id, actor_url).await.map_err(|e| anyhow::anyhow!("{e}")) } pub async fn lookup_actor_by_handle(&self, handle: &str) -> anyhow::Result { @@ -200,17 +249,17 @@ impl ActivityPubService { pub async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - data.federation_repo.add_blocked_domain(domain, reason).await + data.blocklist_repo.add_blocked_domain(domain, reason).await } pub async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - data.federation_repo.remove_blocked_domain(domain).await + data.blocklist_repo.remove_blocked_domain(domain).await } pub async fn get_blocked_domains(&self) -> anyhow::Result> { let data = self.federation_config.to_request_data(); - data.federation_repo.get_blocked_domains().await + data.blocklist_repo.get_blocked_domains().await } // ── Private helpers (accessible to child modules via Rust's privacy rules) ─ @@ -221,7 +270,7 @@ impl ActivityPubService { local_user_id: uuid::Uuid, ) -> anyhow::Result)>> { let local_actor = get_local_actor(local_user_id, data).await.map_err(|e| anyhow::anyhow!("{e}"))?; - let inbox_strs = data.federation_repo.get_accepted_follower_inboxes(local_user_id).await?; + let inbox_strs = data.follow_repo.get_accepted_follower_inboxes(local_user_id).await?; if inbox_strs.is_empty() { return Ok(None); } let inboxes: Vec = inbox_strs.into_iter().filter_map(|s| { Url::parse(&s).map_err(|e| tracing::warn!(inbox = %s, error = %e, "skipping unparseable inbox URL")).ok() diff --git a/src/tests/integration.rs b/src/tests/integration.rs index ce74742..fbd55e0 100644 --- a/src/tests/integration.rs +++ b/src/tests/integration.rs @@ -1,9 +1,5 @@ -/// Integration tests exercising multi-component flows with in-memory trait stubs. -/// -/// These tests don't spin up an HTTP server but they do exercise: -/// - `check_guards` idempotency (is_activity_processed → mark → duplicate rejected) -/// - `extract_and_dispatch_mentions` dispatches on_mention for local actors -/// - Multiple trait implementations wired through FederationData +// src/tests/integration.rs +/// Integration tests with in-memory trait stubs. use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -13,23 +9,23 @@ use chrono::{DateTime, Utc}; use tokio::sync::Mutex; use url::Url; -use crate::content::ApObjectHandler; +use crate::content::{ApContentReader, ApObjectHandler}; use crate::data::FederationData; use crate::repository::{ - BlockedDomain, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, + ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, + Follower, FollowerStatus, FollowingStatus, FollowRepository, RemoteActor, }; use crate::user::{ApActorType, ApProfileField, ApUser, ApUserRepository}; -// ── In-memory FederationRepository ─────────────────────────────────────────── +// ── ActivityRepository ──────────────────────────────────────────────────────── #[derive(Default)] -struct MemRepo { +struct MemActivityRepo { processed: Mutex>, - blocked_domains: Mutex>, } #[async_trait] -impl FederationRepository for MemRepo { +impl ActivityRepository for MemActivityRepo { async fn is_activity_processed(&self, id: &str) -> anyhow::Result { Ok(self.processed.lock().await.contains(id)) } @@ -37,44 +33,89 @@ impl FederationRepository for MemRepo { self.processed.lock().await.insert(id.to_string()); Ok(()) } - async fn is_domain_blocked(&self, domain: &str) -> anyhow::Result { - Ok(self.blocked_domains.lock().await.contains(domain)) - } - // ── stubs ──────────────────────────────────────────────────────────────── +} + +// ── FollowRepository ────────────────────────────────────────────────────────── + +#[derive(Default)] +struct MemFollowRepo; + +#[async_trait] +impl FollowRepository for MemFollowRepo { async fn add_follower(&self, _: uuid::Uuid, _: &str, _: FollowerStatus, _: &str) -> anyhow::Result<()> { Ok(()) } async fn get_follower_follow_activity_id(&self, _: uuid::Uuid, _: &str) -> anyhow::Result> { Ok(None) } async fn remove_follower(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } async fn get_followers(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } async fn get_followers_page(&self, _: uuid::Uuid, _: u32, _: usize) -> anyhow::Result> { Ok(vec![]) } async fn count_followers(&self, _: uuid::Uuid) -> anyhow::Result { Ok(0) } - async fn get_following_page(&self, _: uuid::Uuid, _: u32, _: usize) -> anyhow::Result> { Ok(vec![]) } async fn update_follower_status(&self, _: uuid::Uuid, _: &str, _: FollowerStatus) -> anyhow::Result<()> { Ok(()) } + async fn get_pending_followers(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } + async fn get_accepted_follower_inboxes(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } async fn add_following(&self, _: uuid::Uuid, _: RemoteActor, _: &str) -> anyhow::Result<()> { Ok(()) } async fn get_follow_activity_id(&self, _: uuid::Uuid, _: &str) -> anyhow::Result> { Ok(None) } async fn remove_following(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } async fn get_following(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } + async fn get_following_page(&self, _: uuid::Uuid, _: u32, _: usize) -> anyhow::Result> { Ok(vec![]) } async fn count_following(&self, _: uuid::Uuid) -> anyhow::Result { Ok(0) } - async fn upsert_remote_actor(&self, _: RemoteActor) -> anyhow::Result<()> { Ok(()) } - async fn get_remote_actor(&self, _: &str) -> anyhow::Result> { Ok(None) } - async fn get_local_actor_keypair(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(None) } - async fn save_local_actor_keypair(&self, _: uuid::Uuid, _: String, _: String) -> anyhow::Result<()> { Ok(()) } - async fn get_pending_followers(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } async fn update_following_status(&self, _: uuid::Uuid, _: &str, _: FollowingStatus) -> anyhow::Result<()> { Ok(()) } async fn get_following_outbox_url(&self, _: uuid::Uuid, _: &str) -> anyhow::Result> { Ok(None) } + async fn migrate_follower_actor(&self, _: &str, _: &str) -> anyhow::Result> { Ok(vec![]) } +} + +// ── ActorRepository ─────────────────────────────────────────────────────────── + +#[derive(Default)] +struct MemActorRepo; + +#[async_trait] +impl ActorRepository for MemActorRepo { + async fn get_local_actor_keypair(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(None) } + async fn save_local_actor_keypair(&self, _: uuid::Uuid, _: String, _: String) -> anyhow::Result<()> { Ok(()) } + async fn upsert_remote_actor(&self, _: RemoteActor) -> anyhow::Result<()> { Ok(()) } + async fn get_remote_actor(&self, _: &str) -> anyhow::Result> { Ok(None) } async fn add_announce(&self, _: &str, _: &str, _: &str, _: DateTime) -> anyhow::Result<()> { Ok(()) } async fn count_announces(&self, _: &str) -> anyhow::Result { Ok(0) } - async fn add_blocked_domain(&self, _: &str, _: Option<&str>) -> anyhow::Result<()> { Ok(()) } - async fn remove_blocked_domain(&self, _: &str) -> anyhow::Result<()> { Ok(()) } +} + +// ── BlocklistRepository ─────────────────────────────────────────────────────── + +struct MemBlocklistRepo { + blocked_domains: Mutex>, +} + +impl MemBlocklistRepo { + fn with_blocked_domains(domains: impl IntoIterator) -> Self { + Self { blocked_domains: Mutex::new(domains.into_iter().collect()) } + } +} + +impl Default for MemBlocklistRepo { + fn default() -> Self { + Self { blocked_domains: Mutex::new(HashSet::new()) } + } +} + +#[async_trait] +impl BlocklistRepository for MemBlocklistRepo { + async fn add_blocked_domain(&self, domain: &str, _: Option<&str>) -> anyhow::Result<()> { + self.blocked_domains.lock().await.insert(domain.to_string()); + Ok(()) + } + async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()> { + self.blocked_domains.lock().await.remove(domain); + Ok(()) + } async fn get_blocked_domains(&self) -> anyhow::Result> { Ok(vec![]) } + async fn is_domain_blocked(&self, domain: &str) -> anyhow::Result { + Ok(self.blocked_domains.lock().await.contains(domain)) + } async fn add_blocked_actor(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } async fn remove_blocked_actor(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } async fn get_blocked_actors(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } async fn is_actor_blocked(&self, _: uuid::Uuid, _: &str) -> anyhow::Result { Ok(false) } - async fn migrate_follower_actor(&self, _: &str, _: &str) -> anyhow::Result> { Ok(vec![]) } - async fn get_accepted_follower_inboxes(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } } -// ── In-memory ApUserRepository ──────────────────────────────────────────────── +// ── ApUserRepository ────────────────────────────────────────────────────────── struct MemUserRepo { users: HashMap, @@ -113,7 +154,19 @@ impl ApUserRepository for MemUserRepo { async fn count_users(&self) -> anyhow::Result { Ok(self.users.len()) } } -// ── In-memory ApObjectHandler ───────────────────────────────────────────────── +// ── ApContentReader ─────────────────────────────────────────────────────────── + +#[derive(Default)] +struct MemContentReader; + +#[async_trait] +impl ApContentReader for MemContentReader { + async fn get_local_objects_for_user(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } + async fn get_local_objects_page(&self, _: uuid::Uuid, _: Option>, _: usize) -> anyhow::Result)>> { Ok(vec![]) } + async fn count_local_posts(&self) -> anyhow::Result { Ok(0) } +} + +// ── ApObjectHandler ─────────────────────────────────────────────────────────── #[derive(Default)] struct MemHandler { @@ -123,8 +176,6 @@ struct MemHandler { #[async_trait] impl ApObjectHandler for MemHandler { - async fn get_local_objects_for_user(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } - async fn get_local_objects_page(&self, _: uuid::Uuid, _: Option>, _: usize) -> anyhow::Result)>> { Ok(vec![]) } async fn on_create(&self, ap_id: &Url, _: &Url, _: serde_json::Value) -> anyhow::Result<()> { self.creates.lock().await.push(ap_id.clone()); Ok(()) @@ -133,26 +184,33 @@ impl ApObjectHandler for MemHandler { async fn on_delete(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } async fn on_actor_removed(&self, _: &Url) -> anyhow::Result<()> { Ok(()) } async fn on_like(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } - async fn on_announce_received(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } async fn on_unlike(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } + async fn on_announce_received(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } + async fn on_announce_of_remote(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } async fn on_mention(&self, ap_id: &Url, user_id: uuid::Uuid, _: &Url) -> anyhow::Result<()> { self.mentions.lock().await.push((ap_id.clone(), user_id)); Ok(()) } - async fn on_announce_of_remote(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } - async fn count_local_posts(&self) -> anyhow::Result { Ok(0) } } // ── Helper ──────────────────────────────────────────────────────────────────── fn make_data( - repo: Arc, + activity_repo: Arc, + follow_repo: Arc, + actor_repo: Arc, + blocklist_repo: Arc, user_repo: Arc, + content_reader: Arc, handler: Arc, ) -> FederationData { FederationData::new( - repo, + activity_repo, + follow_repo, + actor_repo, + blocklist_repo, user_repo, + content_reader, handler, "https://example.com".to_string(), false, @@ -165,14 +223,19 @@ fn make_data( #[tokio::test] async fn check_guards_idempotency() { - use crate::activities::helpers::{already_processed, check_guards}; + use crate::activities::helpers::check_guards; use activitypub_federation::config::FederationConfig; - let repo = Arc::new(MemRepo::default()); - let user_repo = Arc::new(MemUserRepo::with_user(uuid::Uuid::new_v4(), "alice")); - let handler = Arc::new(MemHandler::default()); - let data_inner = make_data(repo, user_repo, handler); - + let activity_repo = Arc::new(MemActivityRepo::default()); + let data_inner = make_data( + activity_repo, + Arc::new(MemFollowRepo), + Arc::new(MemActorRepo), + Arc::new(MemBlocklistRepo::default()), + Arc::new(MemUserRepo::with_user(uuid::Uuid::new_v4(), "alice")), + Arc::new(MemContentReader), + Arc::new(MemHandler::default()), + ); let config = FederationConfig::builder() .domain("example.com") .app_data(data_inner) @@ -185,15 +248,12 @@ async fn check_guards_idempotency() { let activity_id: Url = "https://remote.example/activities/abc123".parse().unwrap(); let actor: Url = "https://remote.example/users/bob".parse().unwrap(); - // First call: not processed yet → should NOT skip let skip = check_guards(&activity_id, &actor, &data).await.unwrap(); assert!(!skip, "first delivery should not be skipped"); - // Second call with same activity ID → should skip (duplicate) let skip = check_guards(&activity_id, &actor, &data).await.unwrap(); assert!(skip, "duplicate delivery should be skipped"); - // Different activity ID → should not skip let other_id: Url = "https://remote.example/activities/xyz999".parse().unwrap(); let skip = check_guards(&other_id, &actor, &data).await.unwrap(); assert!(!skip, "different activity should not be skipped"); @@ -204,14 +264,18 @@ async fn check_guards_blocks_domain() { use crate::activities::helpers::check_guards; use activitypub_federation::config::FederationConfig; - let repo = Arc::new(MemRepo { - blocked_domains: Mutex::new(["spam.example".to_string()].into()), - ..Default::default() - }); - let user_repo = Arc::new(MemUserRepo::with_user(uuid::Uuid::new_v4(), "alice")); - let handler = Arc::new(MemHandler::default()); - let data_inner = make_data(repo, user_repo, handler); - + let blocklist_repo = Arc::new(MemBlocklistRepo::with_blocked_domains( + ["spam.example".to_string()], + )); + let data_inner = make_data( + Arc::new(MemActivityRepo::default()), + Arc::new(MemFollowRepo), + Arc::new(MemActorRepo), + blocklist_repo, + Arc::new(MemUserRepo::with_user(uuid::Uuid::new_v4(), "alice")), + Arc::new(MemContentReader), + Arc::new(MemHandler::default()), + ); let config = FederationConfig::builder() .domain("example.com") .app_data(data_inner) @@ -224,7 +288,6 @@ async fn check_guards_blocks_domain() { let activity_id: Url = "https://spam.example/activities/1".parse().unwrap(); let actor: Url = "https://spam.example/users/evil".parse().unwrap(); - // Blocked domain → should skip let skip = check_guards(&activity_id, &actor, &data).await.unwrap(); assert!(skip, "activity from blocked domain should be skipped"); } @@ -235,11 +298,16 @@ async fn extract_and_dispatch_mentions_notifies_local_users() { use activitypub_federation::config::FederationConfig; let local_user_id = uuid::Uuid::new_v4(); - let user_repo = Arc::new(MemUserRepo::with_user(local_user_id, "alice")); let handler = Arc::new(MemHandler::default()); - let repo = Arc::new(MemRepo::default()); - let data_inner = make_data(repo, user_repo.clone(), handler.clone()); - + let data_inner = make_data( + Arc::new(MemActivityRepo::default()), + Arc::new(MemFollowRepo), + Arc::new(MemActorRepo), + Arc::new(MemBlocklistRepo::default()), + Arc::new(MemUserRepo::with_user(local_user_id, "alice")), + Arc::new(MemContentReader), + handler.clone(), + ); let config = FederationConfig::builder() .domain("example.com") .app_data(data_inner) @@ -251,16 +319,12 @@ async fn extract_and_dispatch_mentions_notifies_local_users() { let ap_id: Url = "https://remote.example/notes/1".parse().unwrap(); let actor_url: Url = "https://remote.example/users/bob".parse().unwrap(); - - // Object with a Mention tag pointing to local user URL let local_user_url = format!("https://example.com/users/{}", local_user_id); let object = serde_json::json!({ "type": "Note", "id": ap_id.as_str(), "content": "Hello @alice", - "tag": [ - {"type": "Mention", "href": local_user_url, "name": "@alice@example.com"} - ] + "tag": [{"type": "Mention", "href": local_user_url}] }); extract_and_dispatch_mentions(&ap_id, &actor_url, &object, &data).await;