refactor!: CQRS repository split — v0.3.0

FederationRepository (34 methods) → 4 focused traits:
  ActivityRepository  (2)  — idempotency tracking
  FollowRepository    (18) — follower/following graph + migration
  ActorRepository     (6)  — keypairs, remote actor cache, announce tracking
  BlocklistRepository (8)  — domain + actor blocklists

ApObjectHandler (10 methods) → 2 traits:
  ApContentReader  (3) — get_local_objects_for_user/page, count_local_posts
  ApObjectHandler  (9) — all inbox callbacks (on_create, on_mention, etc.)

Builder changes from positional args to named setters:
  ActivityPubService::builder(base_url)
    .activity_repo(arc)
    .follow_repo(arc)
    .actor_repo(arc)
    .blocklist_repo(arc)
    .user_repo(arc)
    .content_reader(arc)
    .object_handler(arc)
    .build()

No behaviour changes.
This commit is contained in:
2026-05-29 01:47:23 +02:00
parent e11b0a6609
commit df6ff4c1e8
27 changed files with 529 additions and 358 deletions

2
Cargo.lock generated
View File

@@ -1368,7 +1368,7 @@ dependencies = [
[[package]] [[package]]
name = "k-ap" name = "k-ap"
version = "0.2.0" version = "0.3.0"
dependencies = [ dependencies = [
"activitypub_federation", "activitypub_federation",
"anyhow", "anyhow",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "k-ap" name = "k-ap"
version = "0.2.0" version = "0.3.0"
edition = "2024" edition = "2024"
description = "Generic ActivityPub protocol layer" description = "Generic ActivityPub protocol layer"
license = "MIT" license = "MIT"

View File

@@ -46,7 +46,7 @@ impl Activity for AcceptActivity {
} }
let local_user_id = crate::urls::extract_user_id_from_url(self.object.actor.inner()) let local_user_id = crate::urls::extract_user_id_from_url(self.object.actor.inner())
.ok_or_else(|| Error::bad_request(anyhow::anyhow!("invalid actor URL in Follow")))?; .ok_or_else(|| Error::bad_request(anyhow::anyhow!("invalid actor URL in Follow")))?;
data.federation_repo data.follow_repo
.update_following_status( .update_following_status(
local_user_id, local_user_id,
self.actor.inner().as_str(), self.actor.inner().as_str(),

View File

@@ -57,7 +57,7 @@ impl Activity for AnnounceActivity {
tracing::debug!(actor = %self.actor.inner(), object = %self.object, "received Announce of non-local object"); tracing::debug!(actor = %self.actor.inner(), object = %self.object, "received Announce of non-local object");
return Ok(()); return Ok(());
} }
data.federation_repo data.actor_repo
.add_announce( .add_announce(
self.id.as_str(), self.id.as_str(),
self.object.as_str(), self.object.as_str(),

View File

@@ -45,8 +45,8 @@ impl Activity for BlockActivity {
return Ok(()); return Ok(());
} }
if let Some(local_user_id) = crate::urls::extract_user_id_from_url(&self.object) { if let Some(local_user_id) = crate::urls::extract_user_id_from_url(&self.object) {
let _ = data.federation_repo.remove_following(local_user_id, self.actor.inner().as_str()).await; let _ = data.follow_repo.remove_following(local_user_id, self.actor.inner().as_str()).await;
let _ = data.federation_repo.remove_follower(local_user_id, self.actor.inner().as_str()).await; let _ = data.follow_repo.remove_follower(local_user_id, self.actor.inner().as_str()).await;
} }
tracing::info!(actor = %self.actor.inner(), "received block — removed following and follower"); tracing::info!(actor = %self.actor.inner(), "received block — removed following and follower");
Ok(()) Ok(())

View File

@@ -57,7 +57,7 @@ impl Activity for FollowActivity {
} }
// Actor block checked BEFORE any outbound HTTP fetch. // Actor block checked BEFORE any outbound HTTP fetch.
if let Some(target_user_id) = crate::urls::extract_user_id_from_url(self.object.inner()) { if let Some(target_user_id) = crate::urls::extract_user_id_from_url(self.object.inner()) {
if data.federation_repo if data.blocklist_repo
.is_actor_blocked(target_user_id, self.actor.inner().as_str()) .is_actor_blocked(target_user_id, self.actor.inner().as_str())
.await? .await?
{ {
@@ -67,7 +67,7 @@ impl Activity for FollowActivity {
} }
let _follower = self.actor.dereference(data).await?; let _follower = self.actor.dereference(data).await?;
let local_actor = self.object.dereference(data).await?; let local_actor = self.object.dereference(data).await?;
data.federation_repo data.follow_repo
.add_follower( .add_follower(
local_actor.user_id, local_actor.user_id,
self.actor.inner().as_str(), self.actor.inner().as_str(),

View File

@@ -9,13 +9,13 @@ use crate::error::Error;
/// On repo error, skips the check rather than silently dropping the activity. /// On repo error, skips the check rather than silently dropping the activity.
pub(crate) async fn already_processed(activity_id: &Url, data: &Data<FederationData>) -> bool { pub(crate) async fn already_processed(activity_id: &Url, data: &Data<FederationData>) -> bool {
let id = activity_id.as_str(); let id = activity_id.as_str();
match data.federation_repo.is_activity_processed(id).await { match data.activity_repo.is_activity_processed(id).await {
Ok(true) => { Ok(true) => {
tracing::debug!(activity_id = id, "duplicate activity, skipping"); tracing::debug!(activity_id = id, "duplicate activity, skipping");
true true
} }
Ok(false) => { Ok(false) => {
if let Err(e) = data.federation_repo.mark_activity_processed(id).await { if let Err(e) = data.activity_repo.mark_activity_processed(id).await {
tracing::warn!(activity_id = id, error = %e, "failed to mark activity processed"); tracing::warn!(activity_id = id, error = %e, "failed to mark activity processed");
} }
false false
@@ -39,7 +39,7 @@ pub(crate) async fn check_guards(
return Ok(true); return Ok(true);
} }
let domain = actor.host_str().unwrap_or(""); let domain = actor.host_str().unwrap_or("");
if data.federation_repo.is_domain_blocked(domain).await? { if data.blocklist_repo.is_domain_blocked(domain).await? {
tracing::info!(actor = %actor, "ignoring activity from blocked domain"); tracing::info!(actor = %actor, "ignoring activity from blocked domain");
return Ok(true); return Ok(true);
} }

View File

@@ -59,7 +59,7 @@ impl Activity for MoveActivity {
))); )));
} }
let affected = data let affected = data
.federation_repo .follow_repo
.migrate_follower_actor(self.object.as_str(), self.target.as_str()) .migrate_follower_actor(self.object.as_str(), self.target.as_str())
.await .await
.map_err(|e| Error::from(anyhow::anyhow!("{e}")))?; .map_err(|e| Error::from(anyhow::anyhow!("{e}")))?;

View File

@@ -44,7 +44,7 @@ impl Activity for RejectActivity {
return Ok(()); return Ok(());
} }
if let Some(user_id) = crate::urls::extract_user_id_from_url(self.object.actor.inner()) { if let Some(user_id) = crate::urls::extract_user_id_from_url(self.object.actor.inner()) {
data.federation_repo data.follow_repo
.remove_following(user_id, self.actor.inner().as_str()) .remove_following(user_id, self.actor.inner().as_str())
.await?; .await?;
} }

View File

@@ -53,7 +53,7 @@ impl Activity for UndoActivity {
&& let Ok(url) = Url::parse(obj_url) && let Ok(url) = Url::parse(obj_url)
&& let Some(user_id) = crate::urls::extract_user_id_from_url(&url) && let Some(user_id) = crate::urls::extract_user_id_from_url(&url)
{ {
data.federation_repo data.follow_repo
.remove_follower(user_id, self.actor.inner().as_str()) .remove_follower(user_id, self.actor.inner().as_str())
.await?; .await?;
} }

View File

@@ -141,7 +141,7 @@ pub async fn get_local_actor(
.ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found: {}", user_id)))?; .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found: {}", user_id)))?;
let (public_key, private_key) = match data let (public_key, private_key) = match data
.federation_repo .actor_repo
.get_local_actor_keypair(user_id) .get_local_actor_keypair(user_id)
.await? .await?
{ {
@@ -151,7 +151,7 @@ pub async fn get_local_actor(
// Zeroize the private key after storing it so the plaintext doesn't // Zeroize the private key after storing it so the plaintext doesn't
// linger in memory beyond this scope. // linger in memory beyond this scope.
let private_zeroized = Zeroizing::new(kp.private_key.clone()); let private_zeroized = Zeroizing::new(kp.private_key.clone());
data.federation_repo data.actor_repo
.save_local_actor_keypair( .save_local_actor_keypair(
user_id, user_id,
kp.public_key.clone(), kp.public_key.clone(),
@@ -231,7 +231,7 @@ impl Object for DbActor {
}; };
let keypair = data let keypair = data
.federation_repo .actor_repo
.get_local_actor_keypair(user_id) .get_local_actor_keypair(user_id)
.await?; .await?;
@@ -363,7 +363,7 @@ impl Object for DbActor {
avatar_url: json.icon.as_ref().map(|i| i.url.to_string()), avatar_url: json.icon.as_ref().map(|i| i.url.to_string()),
outbox_url: json.outbox.as_ref().map(|u| u.to_string()), outbox_url: json.outbox.as_ref().map(|u| u.to_string()),
}; };
data.federation_repo.upsert_remote_actor(actor).await?; data.actor_repo.upsert_remote_actor(actor).await?;
let url_str = json.id.inner().to_string(); let url_str = json.id.inner().to_string();
let user_id = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, url_str.as_bytes()); let user_id = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, url_str.as_bytes());

View File

@@ -2,17 +2,19 @@ use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use url::Url; use url::Url;
/// Read side — the library queries this when sending content outward.
/// Implement on the same struct as [`ApObjectHandler`] if you prefer
/// a single database type.
#[async_trait] #[async_trait]
pub trait ApObjectHandler: Send + Sync { pub trait ApContentReader: Send + Sync {
/// Returns (ap_id, serialized object) for all local content owned by this user. /// All locally-authored objects for this user. Used by backfill on accept_follower.
/// Used by outbox (count) and backfill (delivery). Must only return locally-authored content.
async fn get_local_objects_for_user( async fn get_local_objects_for_user(
&self, &self,
user_id: uuid::Uuid, user_id: uuid::Uuid,
) -> anyhow::Result<Vec<(Url, serde_json::Value)>>; ) -> anyhow::Result<Vec<(Url, serde_json::Value)>>;
/// Returns up to `limit` objects ordered newest-first, published before `before`. /// Newest-first page of locally-authored objects, published before `before`.
/// Returns (ap_id, object_json, published_at). /// Returns `(ap_id, object_json, published_at)`. Used by the outbox endpoint.
async fn get_local_objects_page( async fn get_local_objects_page(
&self, &self,
user_id: uuid::Uuid, user_id: uuid::Uuid,
@@ -20,7 +22,13 @@ pub trait ApObjectHandler: Send + Sync {
limit: usize, limit: usize,
) -> anyhow::Result<Vec<(Url, serde_json::Value, DateTime<Utc>)>>; ) -> anyhow::Result<Vec<(Url, serde_json::Value, DateTime<Utc>)>>;
/// Incoming Create activity — persist remote content. /// Total locally-authored posts across all users. Used by NodeInfo.
async fn count_local_posts(&self) -> anyhow::Result<u64>;
}
/// Write side — the library calls these when processing inbound AP activities.
#[async_trait]
pub trait ApObjectHandler: Send + Sync {
async fn on_create( async fn on_create(
&self, &self,
ap_id: &Url, ap_id: &Url,
@@ -28,7 +36,6 @@ pub trait ApObjectHandler: Send + Sync {
object: serde_json::Value, object: serde_json::Value,
) -> anyhow::Result<()>; ) -> anyhow::Result<()>;
/// Incoming Update activity — update existing remote content.
async fn on_update( async fn on_update(
&self, &self,
ap_id: &Url, ap_id: &Url,
@@ -36,43 +43,30 @@ pub trait ApObjectHandler: Send + Sync {
object: serde_json::Value, object: serde_json::Value,
) -> anyhow::Result<()>; ) -> anyhow::Result<()>;
/// Incoming Delete activity — remove specific remote content.
async fn on_delete(&self, ap_id: &Url, actor_url: &Url) -> anyhow::Result<()>; async fn on_delete(&self, ap_id: &Url, actor_url: &Url) -> anyhow::Result<()>;
/// Actor unfollowed/was removed — clean up all their remote content.
async fn on_actor_removed(&self, actor_url: &Url) -> anyhow::Result<()>; async fn on_actor_removed(&self, actor_url: &Url) -> anyhow::Result<()>;
/// Called when a remote actor likes a local thought.
/// `object_url` is the AP URL of the liked note (e.g. `{base}/thoughts/{uuid}`).
/// `actor_url` is the AP URL of the remote actor who sent the Like.
async fn on_like(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; async fn on_like(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
/// Called when a remote actor boosts (Announce) a local thought.
/// `object_url` is the AP URL of the announced note.
/// `actor_url` is the AP URL of the remote actor who sent the Announce.
async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
/// Called when a remote actor removes a Like from a local thought.
async fn on_unlike(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; async fn on_unlike(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
/// Called when an inbound Note tags a local user with a Mention. async fn on_announce_received(
async fn on_mention(
&self, &self,
thought_ap_id: &Url, object_url: &Url,
mentioned_user_uuid: uuid::Uuid,
actor_url: &Url, actor_url: &Url,
) -> anyhow::Result<()>; ) -> anyhow::Result<()>;
/// Called when a remote actor boosts (Announce) a non-local object.
/// Use this to surface cross-server boosts in followers' feeds.
/// `object_url` is the AP URL of the announced note.
/// `actor_url` is the AP URL of the remote actor who sent the Announce.
async fn on_announce_of_remote( async fn on_announce_of_remote(
&self, &self,
object_url: &Url, object_url: &Url,
actor_url: &Url, actor_url: &Url,
) -> anyhow::Result<()>; ) -> anyhow::Result<()>;
/// Total number of locally-authored posts across all users. async fn on_mention(
async fn count_local_posts(&self) -> anyhow::Result<u64>; &self,
thought_ap_id: &Url,
mentioned_user_uuid: uuid::Uuid,
actor_url: &Url,
) -> anyhow::Result<()>;
} }

View File

@@ -1,35 +1,25 @@
use std::sync::Arc; use std::sync::Arc;
use crate::content::ApObjectHandler; use crate::content::{ApContentReader, ApObjectHandler};
use crate::repository::FederationRepository; use crate::repository::{ActivityRepository, ActorRepository, BlocklistRepository, FollowRepository};
use crate::user::ApUserRepository; use crate::user::ApUserRepository;
/// Typed event emitted by the federation layer. Consumers wire in an /// Typed event emitted by the federation layer.
/// [`EventPublisher`] to receive these and drive side effects (job queues,
/// webhooks, metrics, etc.).
/// ///
/// # Delivery flow /// When an [`EventPublisher`] is configured, outbound activities are NOT
///
/// When an `EventPublisher` is configured, outbound activities are NOT
/// delivered directly — instead a [`FederationEvent::DeliveryRequested`] event /// delivered directly — instead a [`FederationEvent::DeliveryRequested`] event
/// is published for each target inbox. The consumer's job queue should: /// is published per inbox. The consumer's job queue should:
/// 1. Persist the event. /// 1. Persist the event.
/// 2. Call [`crate::service::ActivityPubService::deliver_to_inbox`] when /// 2. Call [`crate::service::ActivityPubService::deliver_to_inbox`] when processing.
/// processing the queue item.
/// ///
/// Without a publisher, the library falls back to fire-and-forget /// Without a publisher, the library falls back to `tokio::spawn` delivery.
/// `tokio::spawn` delivery (no persistence across restarts).
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum FederationEvent { pub enum FederationEvent {
/// An outbound activity must be delivered to `inbox`.
/// Call `ActivityPubService::deliver_to_inbox(inbox, activity, signing_actor_id)`.
DeliveryRequested { DeliveryRequested {
inbox: url::Url, inbox: url::Url,
activity: serde_json::Value, activity: serde_json::Value,
signing_actor_id: uuid::Uuid, signing_actor_id: uuid::Uuid,
}, },
/// Delivery to `inbox` failed permanently after all in-process retries.
/// The consumer may schedule additional retries or alert.
DeliveryFailed { DeliveryFailed {
inbox: url::Url, inbox: url::Url,
activity: serde_json::Value, activity: serde_json::Value,
@@ -38,10 +28,7 @@ pub enum FederationEvent {
}, },
} }
/// Receives typed federation events from the library. /// Receives typed federation events.
///
/// Implement this trait to bridge federation events into your application's
/// job queue, message broker, or metrics system.
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait EventPublisher: Send + Sync { pub trait EventPublisher: Send + Sync {
async fn publish(&self, event: FederationEvent) -> anyhow::Result<()>; async fn publish(&self, event: FederationEvent) -> anyhow::Result<()>;
@@ -49,8 +36,12 @@ pub trait EventPublisher: Send + Sync {
#[derive(Clone)] #[derive(Clone)]
pub struct FederationData { pub struct FederationData {
pub(crate) federation_repo: Arc<dyn FederationRepository>, pub(crate) activity_repo: Arc<dyn ActivityRepository>,
pub(crate) follow_repo: Arc<dyn FollowRepository>,
pub(crate) actor_repo: Arc<dyn ActorRepository>,
pub(crate) blocklist_repo: Arc<dyn BlocklistRepository>,
pub(crate) user_repo: Arc<dyn ApUserRepository>, pub(crate) user_repo: Arc<dyn ApUserRepository>,
pub(crate) content_reader: Arc<dyn ApContentReader>,
pub(crate) object_handler: Arc<dyn ApObjectHandler>, pub(crate) object_handler: Arc<dyn ApObjectHandler>,
pub(crate) base_url: String, pub(crate) base_url: String,
pub(crate) domain: String, pub(crate) domain: String,
@@ -60,9 +51,14 @@ pub struct FederationData {
} }
impl FederationData { impl FederationData {
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
federation_repo: Arc<dyn FederationRepository>, activity_repo: Arc<dyn ActivityRepository>,
follow_repo: Arc<dyn FollowRepository>,
actor_repo: Arc<dyn ActorRepository>,
blocklist_repo: Arc<dyn BlocklistRepository>,
user_repo: Arc<dyn ApUserRepository>, user_repo: Arc<dyn ApUserRepository>,
content_reader: Arc<dyn ApContentReader>,
object_handler: Arc<dyn ApObjectHandler>, object_handler: Arc<dyn ApObjectHandler>,
base_url: String, base_url: String,
allow_registration: bool, allow_registration: bool,
@@ -77,8 +73,12 @@ impl FederationData {
.unwrap_or("") .unwrap_or("")
.to_string(); .to_string();
Self { Self {
federation_repo, activity_repo,
follow_repo,
actor_repo,
blocklist_repo,
user_repo, user_repo,
content_reader,
object_handler, object_handler,
base_url, base_url,
domain, domain,

View File

@@ -33,8 +33,8 @@ async fn collection_handler(
); );
let total = match collection_type { let total = match collection_type {
"followers" => data.federation_repo.count_followers(user_id).await, "followers" => data.follow_repo.count_followers(user_id).await,
_ => data.federation_repo.count_following(user_id).await, _ => data.follow_repo.count_following(user_id).await,
} }
.map_err(Error::from)?; .map_err(Error::from)?;
@@ -44,7 +44,7 @@ async fn collection_handler(
let items: Vec<String> = match collection_type { let items: Vec<String> = match collection_type {
"followers" => data "followers" => data
.federation_repo .follow_repo
.get_followers_page(user_id, offset as u32, AP_PAGE_SIZE) .get_followers_page(user_id, offset as u32, AP_PAGE_SIZE)
.await .await
.map_err(Error::from)? .map_err(Error::from)?
@@ -52,7 +52,7 @@ async fn collection_handler(
.map(|f| f.actor.url) .map(|f| f.actor.url)
.collect(), .collect(),
_ => data _ => data
.federation_repo .follow_repo
.get_following_page(user_id, offset as u32, AP_PAGE_SIZE) .get_following_page(user_id, offset as u32, AP_PAGE_SIZE)
.await .await
.map_err(Error::from)? .map_err(Error::from)?

View File

@@ -17,12 +17,13 @@ pub mod webfinger;
pub use urls::AS_PUBLIC; pub use urls::AS_PUBLIC;
pub use activitypub_federation::kinds::object::NoteType; pub use activitypub_federation::kinds::object::NoteType;
pub use content::ApObjectHandler; pub use content::{ApContentReader, ApObjectHandler};
pub use data::{EventPublisher, FederationData, FederationEvent}; pub use data::{EventPublisher, FederationData, FederationEvent};
pub use error::Error; pub use error::Error;
pub use federation::ApFederationConfig; pub use federation::ApFederationConfig;
pub use repository::{ pub use repository::{
BlockedDomain, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository,
Follower, FollowerStatus, FollowingStatus, FollowRepository, RemoteActor,
}; };
pub use service::ActivityPubService; pub use service::ActivityPubService;
pub use user::{ApActorType, ApProfileField, ApUser, ApUserRepository, ApVisibility, LookedUpActor}; pub use user::{ApActorType, ApProfileField, ApUser, ApUserRepository, ApVisibility, LookedUpActor};

View File

@@ -60,7 +60,7 @@ pub async fn nodeinfo_well_known_handler(
pub async fn nodeinfo_handler(data: Data<FederationData>) -> Result<Json<NodeInfo>, Error> { pub async fn nodeinfo_handler(data: Data<FederationData>) -> Result<Json<NodeInfo>, Error> {
let user_count = data.user_repo.count_users().await.unwrap_or(0); let user_count = data.user_repo.count_users().await.unwrap_or(0);
let local_posts = data.object_handler.count_local_posts().await.unwrap_or(0); let local_posts = data.content_reader.count_local_posts().await.unwrap_or(0);
Ok(Json(NodeInfo { Ok(Json(NodeInfo {
version: "2.0".to_string(), version: "2.0".to_string(),

View File

@@ -66,7 +66,7 @@ pub async fn outbox_handler(
// if count_local_posts returns 0. In practice this trait method is called // if count_local_posts returns 0. In practice this trait method is called
// infrequently (only on the root collection endpoint). // infrequently (only on the root collection endpoint).
let total = data let total = data
.object_handler .content_reader
.count_local_posts() .count_local_posts()
.await .await
.map_err(|e| Error::from(anyhow::anyhow!("{}", e)))?; .map_err(|e| Error::from(anyhow::anyhow!("{}", e)))?;
@@ -75,7 +75,7 @@ pub async fn outbox_handler(
let before: Option<DateTime<Utc>> = query.before.as_deref().and_then(|s| s.parse().ok()); let before: Option<DateTime<Utc>> = query.before.as_deref().and_then(|s| s.parse().ok());
let items = data let items = data
.object_handler .content_reader
.get_local_objects_page(uuid, before, AP_PAGE_SIZE) .get_local_objects_page(uuid, before, AP_PAGE_SIZE)
.await .await
.map_err(|e| Error::from(anyhow::anyhow!("{}", e)))?; .map_err(|e| Error::from(anyhow::anyhow!("{}", e)))?;

View File

@@ -1,160 +0,0 @@
use anyhow::Result;
use async_trait::async_trait;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FollowerStatus {
Pending,
Accepted,
Rejected,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FollowingStatus {
Pending,
Accepted,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemoteActor {
pub url: String,
pub handle: String,
pub inbox_url: String,
pub shared_inbox_url: Option<String>,
pub display_name: Option<String>,
pub avatar_url: Option<String>,
pub outbox_url: Option<String>,
}
#[derive(Debug, Clone)]
pub struct Follower {
pub actor: RemoteActor,
pub status: FollowerStatus,
}
#[derive(Debug, Clone)]
pub struct BlockedDomain {
pub domain: String,
pub reason: Option<String>,
pub blocked_at: String,
}
#[async_trait]
pub trait FederationRepository: Send + Sync {
async fn add_follower(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
status: FollowerStatus,
follow_activity_id: &str,
) -> Result<()>;
async fn get_follower_follow_activity_id(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
) -> Result<Option<String>>;
async fn remove_follower(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
) -> Result<()>;
async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result<Vec<Follower>>;
async fn get_followers_page(
&self,
local_user_id: uuid::Uuid,
offset: u32,
limit: usize,
) -> Result<Vec<Follower>>;
async fn count_followers(&self, local_user_id: uuid::Uuid) -> Result<usize>;
async fn get_following_page(
&self,
local_user_id: uuid::Uuid,
offset: u32,
limit: usize,
) -> Result<Vec<RemoteActor>>;
async fn update_follower_status(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
status: FollowerStatus,
) -> Result<()>;
async fn add_following(
&self,
local_user_id: uuid::Uuid,
actor: RemoteActor,
follow_activity_id: &str,
) -> Result<()>;
async fn get_follow_activity_id(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
) -> Result<Option<String>>;
async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>;
async fn get_following(&self, local_user_id: uuid::Uuid) -> Result<Vec<RemoteActor>>;
async fn count_following(&self, local_user_id: uuid::Uuid) -> Result<usize>;
async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()>;
async fn get_remote_actor(&self, actor_url: &str) -> Result<Option<RemoteActor>>;
async fn get_local_actor_keypair(
&self,
user_id: uuid::Uuid,
) -> Result<Option<(String, String)>>;
async fn save_local_actor_keypair(
&self,
user_id: uuid::Uuid,
public_key: String,
private_key: String,
) -> Result<()>;
async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result<Vec<RemoteActor>>;
async fn update_following_status(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
status: FollowingStatus,
) -> Result<()>;
async fn get_following_outbox_url(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
) -> Result<Option<String>>;
async fn add_announce(
&self,
activity_id: &str,
object_url: &str,
actor_url: &str,
announced_at: chrono::DateTime<chrono::Utc>,
) -> Result<()>;
async fn count_announces(&self, object_url: &str) -> Result<usize>;
async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> Result<()>;
async fn remove_blocked_domain(&self, domain: &str) -> Result<()>;
async fn get_blocked_domains(&self) -> Result<Vec<BlockedDomain>>;
async fn is_domain_blocked(&self, domain: &str) -> Result<bool>;
async fn add_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>;
async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>;
async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result<Vec<String>>;
async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<bool>;
/// Migrate all local following records from old_actor_url to new_actor_url.
/// Returns the local user IDs whose records were migrated (excludes users
/// already following the new actor — they need no re-follow).
async fn migrate_follower_actor(
&self,
old_actor_url: &str,
new_actor_url: &str,
) -> Result<Vec<uuid::Uuid>>;
/// Return `true` if an activity with `activity_id` has already been processed.
/// Implementations should enforce a UNIQUE constraint on the stored activity IDs
/// so concurrent duplicate deliveries are safely rejected.
async fn is_activity_processed(&self, activity_id: &str) -> Result<bool>;
/// Record `activity_id` as processed. Called immediately before dispatching
/// each inbound activity so that retried deliveries are no-ops.
async fn mark_activity_processed(&self, activity_id: &str) -> Result<()>;
/// Return deduplicated inbox URLs (shared_inbox preferred over personal inbox)
/// for all **accepted** followers of `local_user_id`, excluding any actors or
/// domains that are blocked. Implementations should perform filtering and
/// deduplication in the database rather than in application memory.
async fn get_accepted_follower_inboxes(
&self,
local_user_id: uuid::Uuid,
) -> Result<Vec<String>>;
}

View File

@@ -0,0 +1,11 @@
use anyhow::Result;
use async_trait::async_trait;
/// Tracks which inbound AP activity IDs have already been processed.
/// Prevents duplicate handling when remote servers retry delivery.
/// Implementations should enforce a UNIQUE constraint on stored IDs.
#[async_trait]
pub trait ActivityRepository: Send + Sync {
async fn is_activity_processed(&self, activity_id: &str) -> Result<bool>;
async fn mark_activity_processed(&self, activity_id: &str) -> Result<()>;
}

34
src/repository/actor.rs Normal file
View File

@@ -0,0 +1,34 @@
use anyhow::Result;
use async_trait::async_trait;
use super::RemoteActor;
/// Manages local actor keypairs, remote actor cache, and Announce tracking.
#[async_trait]
pub trait ActorRepository: Send + Sync {
// ── Local keypairs ──────────────────────────────────────────────────────
async fn get_local_actor_keypair(
&self,
user_id: uuid::Uuid,
) -> Result<Option<(String, String)>>;
async fn save_local_actor_keypair(
&self,
user_id: uuid::Uuid,
public_key: String,
private_key: String,
) -> Result<()>;
// ── Remote actor cache ──────────────────────────────────────────────────
async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()>;
async fn get_remote_actor(&self, actor_url: &str) -> Result<Option<RemoteActor>>;
// ── Boost (Announce) tracking ───────────────────────────────────────────
async fn add_announce(
&self,
activity_id: &str,
object_url: &str,
actor_url: &str,
announced_at: chrono::DateTime<chrono::Utc>,
) -> Result<()>;
async fn count_announces(&self, object_url: &str) -> Result<usize>;
}

View File

@@ -0,0 +1,35 @@
use anyhow::Result;
use async_trait::async_trait;
use super::BlockedDomain;
/// Domain and actor-level blocklists.
#[async_trait]
pub trait BlocklistRepository: Send + Sync {
// ── Domain blocklist ────────────────────────────────────────────────────
async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> Result<()>;
async fn remove_blocked_domain(&self, domain: &str) -> Result<()>;
async fn get_blocked_domains(&self) -> Result<Vec<BlockedDomain>>;
async fn is_domain_blocked(&self, domain: &str) -> Result<bool>;
// ── Per-user actor blocklist ────────────────────────────────────────────
async fn add_blocked_actor(
&self,
local_user_id: uuid::Uuid,
actor_url: &str,
) -> Result<()>;
async fn remove_blocked_actor(
&self,
local_user_id: uuid::Uuid,
actor_url: &str,
) -> Result<()>;
async fn get_blocked_actors(
&self,
local_user_id: uuid::Uuid,
) -> Result<Vec<String>>;
async fn is_actor_blocked(
&self,
local_user_id: uuid::Uuid,
actor_url: &str,
) -> Result<bool>;
}

97
src/repository/follow.rs Normal file
View File

@@ -0,0 +1,97 @@
use anyhow::Result;
use async_trait::async_trait;
use super::{Follower, FollowerStatus, FollowingStatus, RemoteActor};
/// Manages follower/following relationships and account migration.
#[async_trait]
pub trait FollowRepository: Send + Sync {
// ── Inbound followers ───────────────────────────────────────────────────
async fn add_follower(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
status: FollowerStatus,
follow_activity_id: &str,
) -> Result<()>;
async fn get_follower_follow_activity_id(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
) -> Result<Option<String>>;
async fn remove_follower(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
) -> Result<()>;
async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result<Vec<Follower>>;
async fn get_followers_page(
&self,
local_user_id: uuid::Uuid,
offset: u32,
limit: usize,
) -> Result<Vec<Follower>>;
async fn count_followers(&self, local_user_id: uuid::Uuid) -> Result<usize>;
async fn update_follower_status(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
status: FollowerStatus,
) -> Result<()>;
async fn get_pending_followers(
&self,
local_user_id: uuid::Uuid,
) -> Result<Vec<RemoteActor>>;
/// Return deduplicated inbox URLs (shared_inbox preferred) for accepted
/// followers, excluding blocked actors/domains. DB-side filtering.
async fn get_accepted_follower_inboxes(
&self,
local_user_id: uuid::Uuid,
) -> Result<Vec<String>>;
// ── Outbound following ──────────────────────────────────────────────────
async fn add_following(
&self,
local_user_id: uuid::Uuid,
actor: RemoteActor,
follow_activity_id: &str,
) -> Result<()>;
async fn get_follow_activity_id(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
) -> Result<Option<String>>;
async fn remove_following(
&self,
local_user_id: uuid::Uuid,
actor_url: &str,
) -> Result<()>;
async fn get_following(&self, local_user_id: uuid::Uuid) -> Result<Vec<RemoteActor>>;
async fn get_following_page(
&self,
local_user_id: uuid::Uuid,
offset: u32,
limit: usize,
) -> Result<Vec<RemoteActor>>;
async fn count_following(&self, local_user_id: uuid::Uuid) -> Result<usize>;
async fn update_following_status(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
status: FollowingStatus,
) -> Result<()>;
async fn get_following_outbox_url(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
) -> Result<Option<String>>;
// ── Account migration ───────────────────────────────────────────────────
/// Migrate all follower records from `old_actor_url` to `new_actor_url`.
/// Returns local user IDs that need a re-follow sent.
async fn migrate_follower_actor(
&self,
old_actor_url: &str,
new_actor_url: &str,
) -> Result<Vec<uuid::Uuid>>;
}

46
src/repository/mod.rs Normal file
View File

@@ -0,0 +1,46 @@
mod activity;
mod actor;
mod blocklist;
mod follow;
pub use activity::ActivityRepository;
pub use actor::ActorRepository;
pub use blocklist::BlocklistRepository;
pub use follow::FollowRepository;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FollowerStatus {
Pending,
Accepted,
Rejected,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FollowingStatus {
Pending,
Accepted,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemoteActor {
pub url: String,
pub handle: String,
pub inbox_url: String,
pub shared_inbox_url: Option<String>,
pub display_name: Option<String>,
pub avatar_url: Option<String>,
pub outbox_url: Option<String>,
}
#[derive(Debug, Clone)]
pub struct Follower {
pub actor: RemoteActor,
pub status: FollowerStatus,
}
#[derive(Debug, Clone)]
pub struct BlockedDomain {
pub domain: String,
pub reason: Option<String>,
pub blocked_at: String,
}

View File

@@ -86,7 +86,7 @@ impl ActivityPubService {
loop { loop {
let page = data let page = data
.object_handler .content_reader
.get_local_objects_page(owner_user_id, before, BATCH_SIZE) .get_local_objects_page(owner_user_id, before, BATCH_SIZE)
.await?; .await?;

View File

@@ -33,7 +33,7 @@ impl ActivityPubService {
outbox_url: Some(remote_actor.outbox_url.to_string()), outbox_url: Some(remote_actor.outbox_url.to_string()),
}; };
// Save BEFORE delivering — prevents lost state on process restart. // Save BEFORE delivering — prevents lost state on process restart.
data.federation_repo.add_following(local_user_id, remote, &follow_id_str).await?; data.follow_repo.add_following(local_user_id, remote, &follow_id_str).await?;
let follow = FollowActivity { let follow = FollowActivity {
id: Url::parse(&follow_id_str)?, id: Url::parse(&follow_id_str)?,
kind: Default::default(), kind: Default::default(),
@@ -49,12 +49,12 @@ impl ActivityPubService {
if actor_url_str.starts_with(&self.base_url) { if actor_url_str.starts_with(&self.base_url) {
return self.unfollow_local(local_user_id, actor_url_str, &data).await; return self.unfollow_local(local_user_id, actor_url_str, &data).await;
} }
let remote = data.federation_repo.get_remote_actor(actor_url_str).await? let remote = data.actor_repo.get_remote_actor(actor_url_str).await?
.ok_or_else(|| anyhow::anyhow!("remote actor not found: {}", actor_url_str))?; .ok_or_else(|| anyhow::anyhow!("remote actor not found: {}", actor_url_str))?;
let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?;
let remote_ap_id = Url::parse(actor_url_str)?; let remote_ap_id = Url::parse(actor_url_str)?;
let inbox = Url::parse(&remote.inbox_url)?; let inbox = Url::parse(&remote.inbox_url)?;
let follow_id = data.federation_repo.get_follow_activity_id(local_user_id, actor_url_str).await? let follow_id = data.follow_repo.get_follow_activity_id(local_user_id, actor_url_str).await?
.and_then(|id| Url::parse(&id).ok()) .and_then(|id| Url::parse(&id).ok())
.unwrap_or_else(|| activity_url(&self.base_url).unwrap_or_else(|_| remote_ap_id.clone())); .unwrap_or_else(|| activity_url(&self.base_url).unwrap_or_else(|_| remote_ap_id.clone()));
let follow = FollowActivity { id: follow_id, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: ObjectId::from(remote_ap_id) }; let follow = FollowActivity { id: follow_id, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: ObjectId::from(remote_ap_id) };
@@ -66,7 +66,7 @@ impl ActivityPubService {
}; };
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], undo).await?; let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], undo).await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?;
data.federation_repo.remove_following(local_user_id, actor_url_str).await?; data.follow_repo.remove_following(local_user_id, actor_url_str).await?;
data.object_handler.on_actor_removed(&Url::parse(actor_url_str)?).await?; data.object_handler.on_actor_removed(&Url::parse(actor_url_str)?).await?;
Ok(()) Ok(())
} }
@@ -74,13 +74,13 @@ impl ActivityPubService {
pub async fn accept_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> anyhow::Result<()> { pub async fn accept_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?;
let remote_actor = data.federation_repo.get_remote_actor(remote_actor_url).await? let remote_actor = data.actor_repo.get_remote_actor(remote_actor_url).await?
.ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?;
let follow_id_str = data.federation_repo.get_follower_follow_activity_id(local_user_id, remote_actor_url).await? let follow_id_str = data.follow_repo.get_follower_follow_activity_id(local_user_id, remote_actor_url).await?
.ok_or_else(|| anyhow::anyhow!("follow activity id not found for {}", remote_actor_url))?; .ok_or_else(|| anyhow::anyhow!("follow activity id not found for {}", remote_actor_url))?;
let follow = FollowActivity { id: Url::parse(&follow_id_str)?, kind: Default::default(), actor: ObjectId::from(Url::parse(remote_actor_url)?), object: ObjectId::from(local_actor.ap_id.clone()) }; let follow = FollowActivity { id: Url::parse(&follow_id_str)?, kind: Default::default(), actor: ObjectId::from(Url::parse(remote_actor_url)?), object: ObjectId::from(local_actor.ap_id.clone()) };
let accept = AcceptActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: follow }; let accept = AcceptActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: follow };
data.federation_repo.update_follower_status(local_user_id, remote_actor_url, FollowerStatus::Accepted).await?; data.follow_repo.update_follower_status(local_user_id, remote_actor_url, FollowerStatus::Accepted).await?;
let inbox = Url::parse(&remote_actor.inbox_url)?; let inbox = Url::parse(&remote_actor.inbox_url)?;
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], accept).await?; let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], accept).await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?;
@@ -92,25 +92,25 @@ impl ActivityPubService {
pub async fn reject_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> anyhow::Result<()> { pub async fn reject_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?;
let remote_actor = data.federation_repo.get_remote_actor(remote_actor_url).await? let remote_actor = data.actor_repo.get_remote_actor(remote_actor_url).await?
.ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?;
let follow = FollowActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(Url::parse(remote_actor_url)?), object: ObjectId::from(local_actor.ap_id.clone()) }; let follow = FollowActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(Url::parse(remote_actor_url)?), object: ObjectId::from(local_actor.ap_id.clone()) };
let reject = RejectActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: follow }; let reject = RejectActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: follow };
let inbox = Url::parse(&remote_actor.inbox_url)?; let inbox = Url::parse(&remote_actor.inbox_url)?;
let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], reject).await?; let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], reject).await?;
self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?;
data.federation_repo.remove_follower(local_user_id, remote_actor_url).await?; data.follow_repo.remove_follower(local_user_id, remote_actor_url).await?;
Ok(()) Ok(())
} }
pub async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> { pub async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.federation_repo.get_pending_followers(local_user_id).await data.follow_repo.get_pending_followers(local_user_id).await
} }
pub async fn get_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> { pub async fn get_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
Ok(data.federation_repo.get_followers(local_user_id).await? Ok(data.follow_repo.get_followers(local_user_id).await?
.into_iter() .into_iter()
.filter(|f| f.status == FollowerStatus::Accepted) .filter(|f| f.status == FollowerStatus::Accepted)
.map(|f| f.actor) .map(|f| f.actor)
@@ -119,7 +119,7 @@ impl ActivityPubService {
pub async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result<usize> { pub async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result<usize> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
Ok(data.federation_repo.get_followers(local_user_id).await? Ok(data.follow_repo.get_followers(local_user_id).await?
.into_iter() .into_iter()
.filter(|f| f.status == FollowerStatus::Accepted) .filter(|f| f.status == FollowerStatus::Accepted)
.count()) .count())
@@ -127,26 +127,26 @@ impl ActivityPubService {
pub async fn get_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> { pub async fn get_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.federation_repo.get_following(local_user_id).await data.follow_repo.get_following(local_user_id).await
} }
pub async fn count_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result<usize> { pub async fn count_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result<usize> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.federation_repo.count_following(local_user_id).await data.follow_repo.count_following(local_user_id).await
} }
pub async fn remove_follower(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { pub async fn remove_follower(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.federation_repo.remove_follower(local_user_id, actor_url).await data.follow_repo.remove_follower(local_user_id, actor_url).await
} }
pub async fn block_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { pub async fn block_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.federation_repo.add_blocked_actor(local_user_id, actor_url).await?; data.blocklist_repo.add_blocked_actor(local_user_id, actor_url).await?;
let _ = data.federation_repo.remove_follower(local_user_id, actor_url).await; let _ = data.follow_repo.remove_follower(local_user_id, actor_url).await;
let _ = data.federation_repo.remove_following(local_user_id, actor_url).await; let _ = data.follow_repo.remove_following(local_user_id, actor_url).await;
let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?;
if let Ok(Some(remote_actor)) = data.federation_repo.get_remote_actor(actor_url).await { if let Ok(Some(remote_actor)) = data.actor_repo.get_remote_actor(actor_url).await {
let block = crate::activities::BlockActivity { let block = crate::activities::BlockActivity {
id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?,
kind: Default::default(), kind: Default::default(),
@@ -162,15 +162,15 @@ impl ActivityPubService {
pub async fn unblock_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { pub async fn unblock_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.federation_repo.remove_blocked_actor(local_user_id, actor_url).await data.blocklist_repo.remove_blocked_actor(local_user_id, actor_url).await
} }
pub async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> { pub async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let actor_urls = data.federation_repo.get_blocked_actors(local_user_id).await?; let actor_urls = data.blocklist_repo.get_blocked_actors(local_user_id).await?;
let mut actors = Vec::new(); let mut actors = Vec::new();
for url in actor_urls { for url in actor_urls {
let actor = match data.federation_repo.get_remote_actor(&url).await { let actor = match data.actor_repo.get_remote_actor(&url).await {
Ok(Some(a)) => a, Ok(Some(a)) => a,
_ => RemoteActor { url: url.clone(), handle: url.clone(), inbox_url: url.clone(), shared_inbox_url: None, display_name: None, avatar_url: None, outbox_url: None }, _ => RemoteActor { url: url.clone(), handle: url.clone(), inbox_url: url.clone(), shared_inbox_url: None, display_name: None, avatar_url: None, outbox_url: None },
}; };
@@ -193,7 +193,7 @@ impl ActivityPubService {
let follower_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); let follower_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string();
let target_actor_url = crate::urls::actor_url(&self.base_url, target.id); let target_actor_url = crate::urls::actor_url(&self.base_url, target.id);
let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?.to_string(); let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?.to_string();
data.federation_repo.add_follower(target.id, &follower_actor_url, FollowerStatus::Accepted, &follow_id).await?; data.follow_repo.add_follower(target.id, &follower_actor_url, FollowerStatus::Accepted, &follow_id).await?;
let target_as_remote = RemoteActor { let target_as_remote = RemoteActor {
url: target_actor_url.to_string(), url: target_actor_url.to_string(),
handle: format!("{}@{}", target.username, data.domain), handle: format!("{}@{}", target.username, data.domain),
@@ -203,8 +203,8 @@ impl ActivityPubService {
avatar_url: None, avatar_url: None,
outbox_url: None, outbox_url: None,
}; };
data.federation_repo.add_following(local_user_id, target_as_remote, &follow_id).await?; data.follow_repo.add_following(local_user_id, target_as_remote, &follow_id).await?;
data.federation_repo.update_following_status(local_user_id, target_actor_url.as_ref(), FollowingStatus::Accepted).await?; data.follow_repo.update_following_status(local_user_id, target_actor_url.as_ref(), FollowingStatus::Accepted).await?;
tracing::info!(follower = %local_user_id, followee = %target.id, "local follow"); tracing::info!(follower = %local_user_id, followee = %target.id, "local follow");
Ok(()) Ok(())
} }
@@ -219,8 +219,8 @@ impl ActivityPubService {
let target_user_id = crate::urls::extract_user_id_from_url(&target_url) let target_user_id = crate::urls::extract_user_id_from_url(&target_url)
.ok_or_else(|| anyhow::anyhow!("invalid local actor URL: {}", target_actor_url))?; .ok_or_else(|| anyhow::anyhow!("invalid local actor URL: {}", target_actor_url))?;
let local_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); let local_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string();
data.federation_repo.remove_follower(target_user_id, &local_actor_url).await?; data.follow_repo.remove_follower(target_user_id, &local_actor_url).await?;
data.federation_repo.remove_following(local_user_id, target_actor_url).await?; data.follow_repo.remove_following(local_user_id, target_actor_url).await?;
tracing::info!(follower = %local_user_id, followee = %target_user_id, "local unfollow"); tracing::info!(follower = %local_user_id, followee = %target_user_id, "local unfollow");
Ok(()) Ok(())
} }

View File

@@ -10,14 +10,18 @@ use url::Url;
use crate::{ use crate::{
actor_handler::actor_handler, actor_handler::actor_handler,
actors::{DbActor, get_local_actor}, actors::{DbActor, get_local_actor},
content::ApObjectHandler, content::{ApContentReader, ApObjectHandler},
data::FederationData, data::FederationData,
federation::ApFederationConfig, federation::ApFederationConfig,
followers_handler::{followers_handler, following_handler}, followers_handler::{followers_handler, following_handler},
inbox::inbox_handler, inbox::inbox_handler,
nodeinfo::{nodeinfo_handler, nodeinfo_well_known_handler}, nodeinfo::{nodeinfo_handler, nodeinfo_well_known_handler},
outbox::outbox_handler, outbox::outbox_handler,
repository::{BlockedDomain, FederationRepository}, repository::{
ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository,
FollowRepository, FollowerStatus, FollowingStatus, RemoteActor,
},
urls::activity_url,
user::ApUserRepository, user::ApUserRepository,
webfinger::webfinger_handler, webfinger::webfinger_handler,
}; };
@@ -45,9 +49,13 @@ pub struct ActivityPubService {
} }
pub struct ActivityPubServiceBuilder { pub struct ActivityPubServiceBuilder {
repo: Arc<dyn FederationRepository>, activity_repo: Option<Arc<dyn ActivityRepository>>,
user_repo: Arc<dyn ApUserRepository>, follow_repo: Option<Arc<dyn FollowRepository>>,
object_handler: Arc<dyn crate::content::ApObjectHandler>, actor_repo: Option<Arc<dyn ActorRepository>>,
blocklist_repo: Option<Arc<dyn BlocklistRepository>>,
user_repo: Option<Arc<dyn ApUserRepository>>,
content_reader: Option<Arc<dyn ApContentReader>>,
object_handler: Option<Arc<dyn ApObjectHandler>>,
base_url: String, base_url: String,
allow_registration: bool, allow_registration: bool,
software_name: String, software_name: String,
@@ -58,19 +66,56 @@ pub struct ActivityPubServiceBuilder {
} }
impl ActivityPubServiceBuilder { impl ActivityPubServiceBuilder {
pub fn activity_repo(mut self, v: Arc<dyn ActivityRepository>) -> Self {
self.activity_repo = Some(v); self
}
pub fn follow_repo(mut self, v: Arc<dyn FollowRepository>) -> Self {
self.follow_repo = Some(v); self
}
pub fn actor_repo(mut self, v: Arc<dyn ActorRepository>) -> Self {
self.actor_repo = Some(v); self
}
pub fn blocklist_repo(mut self, v: Arc<dyn BlocklistRepository>) -> Self {
self.blocklist_repo = Some(v); self
}
pub fn user_repo(mut self, v: Arc<dyn ApUserRepository>) -> Self {
self.user_repo = Some(v); self
}
pub fn content_reader(mut self, v: Arc<dyn ApContentReader>) -> Self {
self.content_reader = Some(v); self
}
pub fn object_handler(mut self, v: Arc<dyn ApObjectHandler>) -> Self {
self.object_handler = Some(v); self
}
pub fn allow_registration(mut self, v: bool) -> Self { self.allow_registration = v; self } pub fn allow_registration(mut self, v: bool) -> Self { self.allow_registration = v; self }
pub fn software_name(mut self, v: impl Into<String>) -> Self { self.software_name = v.into(); self } pub fn software_name(mut self, v: impl Into<String>) -> Self { self.software_name = v.into(); self }
pub fn debug(mut self, v: bool) -> Self { self.debug = v; self } pub fn debug(mut self, v: bool) -> Self { self.debug = v; self }
pub fn event_publisher(mut self, v: Arc<dyn crate::data::EventPublisher>) -> Self { self.event_publisher = Some(v); self } pub fn event_publisher(mut self, v: Arc<dyn crate::data::EventPublisher>) -> Self {
/// Override max delivery retries (default: `DELIVERY_MAX_ATTEMPTS`). self.event_publisher = Some(v); self
}
pub fn delivery_max_attempts(mut self, v: u32) -> Self { self.delivery_max_attempts = v; self } pub fn delivery_max_attempts(mut self, v: u32) -> Self { self.delivery_max_attempts = v; self }
/// Override initial retry backoff in seconds (default: `DELIVERY_INITIAL_DELAY_SECS`).
pub fn delivery_initial_delay_secs(mut self, v: u64) -> Self { self.delivery_initial_delay_secs = v; self } pub fn delivery_initial_delay_secs(mut self, v: u64) -> Self { self.delivery_initial_delay_secs = v; self }
pub async fn build(self) -> anyhow::Result<ActivityPubService> { pub async fn build(self) -> anyhow::Result<ActivityPubService> {
let activity_repo = self.activity_repo
.ok_or_else(|| anyhow::anyhow!("activity_repo required — call .activity_repo(arc)"))?;
let follow_repo = self.follow_repo
.ok_or_else(|| anyhow::anyhow!("follow_repo required — call .follow_repo(arc)"))?;
let actor_repo = self.actor_repo
.ok_or_else(|| anyhow::anyhow!("actor_repo required — call .actor_repo(arc)"))?;
let blocklist_repo = self.blocklist_repo
.ok_or_else(|| anyhow::anyhow!("blocklist_repo required — call .blocklist_repo(arc)"))?;
let user_repo = self.user_repo
.ok_or_else(|| anyhow::anyhow!("user_repo required — call .user_repo(arc)"))?;
let content_reader = self.content_reader
.ok_or_else(|| anyhow::anyhow!("content_reader required — call .content_reader(arc)"))?;
let object_handler = self.object_handler
.ok_or_else(|| anyhow::anyhow!("object_handler required — call .object_handler(arc)"))?;
let data = FederationData::new( let data = FederationData::new(
self.repo, self.user_repo, self.object_handler, self.base_url.clone(), activity_repo, follow_repo, actor_repo, blocklist_repo,
self.allow_registration, self.software_name, self.event_publisher, user_repo, content_reader, object_handler,
self.base_url.clone(), self.allow_registration, self.software_name,
self.event_publisher,
); );
let federation_config = ApFederationConfig::new(data, self.debug).await?; let federation_config = ApFederationConfig::new(data, self.debug).await?;
Ok(ActivityPubService { Ok(ActivityPubService {
@@ -83,16 +128,20 @@ impl ActivityPubServiceBuilder {
} }
impl ActivityPubService { impl ActivityPubService {
pub fn builder( pub fn builder(base_url: impl Into<String>) -> ActivityPubServiceBuilder {
repo: Arc<dyn FederationRepository>,
user_repo: Arc<dyn ApUserRepository>,
object_handler: Arc<dyn ApObjectHandler>,
base_url: impl Into<String>,
) -> ActivityPubServiceBuilder {
ActivityPubServiceBuilder { ActivityPubServiceBuilder {
repo, user_repo, object_handler, base_url: base_url.into(), activity_repo: None,
allow_registration: false, software_name: String::new(), follow_repo: None,
debug: false, event_publisher: None, actor_repo: None,
blocklist_repo: None,
user_repo: None,
content_reader: None,
object_handler: None,
base_url: base_url.into(),
allow_registration: false,
software_name: String::new(),
debug: false,
event_publisher: None,
delivery_max_attempts: DELIVERY_MAX_ATTEMPTS, delivery_max_attempts: DELIVERY_MAX_ATTEMPTS,
delivery_initial_delay_secs: DELIVERY_INITIAL_DELAY_SECS, delivery_initial_delay_secs: DELIVERY_INITIAL_DELAY_SECS,
} }
@@ -133,11 +182,11 @@ impl ActivityPubService {
const PAGE_SIZE: usize = 20; const PAGE_SIZE: usize = 20;
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let collection_id = format!("{}/users/{}/followers", self.base_url, user_id); let collection_id = format!("{}/users/{}/followers", self.base_url, user_id);
let total = data.federation_repo.count_followers(user_id).await?; let total = data.follow_repo.count_followers(user_id).await?;
let obj = if let Some(p) = page { let obj = if let Some(p) = page {
let p = p.max(1); let p = p.max(1);
let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE; let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE;
let followers = data.federation_repo.get_followers_page(user_id, offset as u32, PAGE_SIZE).await?; let followers = data.follow_repo.get_followers_page(user_id, offset as u32, PAGE_SIZE).await?;
let has_next = offset + followers.len() < total; let has_next = offset + followers.len() < total;
let items: Vec<String> = followers.into_iter().map(|f| f.actor.url).collect(); let items: Vec<String> = followers.into_iter().map(|f| f.actor.url).collect();
let mut obj = serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollectionPage","id":format!("{}?page={}",collection_id,p),"partOf":collection_id,"totalItems":total,"orderedItems":items}); let mut obj = serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollectionPage","id":format!("{}?page={}",collection_id,p),"partOf":collection_id,"totalItems":total,"orderedItems":items});
@@ -154,11 +203,11 @@ impl ActivityPubService {
const PAGE_SIZE: usize = 20; const PAGE_SIZE: usize = 20;
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
let collection_id = format!("{}/users/{}/following", self.base_url, user_id); let collection_id = format!("{}/users/{}/following", self.base_url, user_id);
let total = data.federation_repo.count_following(user_id).await?; let total = data.follow_repo.count_following(user_id).await?;
let obj = if let Some(p) = page { let obj = if let Some(p) = page {
let p = p.max(1); let p = p.max(1);
let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE; let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE;
let following = data.federation_repo.get_following_page(user_id, offset as u32, PAGE_SIZE).await?; let following = data.follow_repo.get_following_page(user_id, offset as u32, PAGE_SIZE).await?;
let has_next = offset + following.len() < total; let has_next = offset + following.len() < total;
let items: Vec<String> = following.into_iter().map(|a| a.url).collect(); let items: Vec<String> = following.into_iter().map(|a| a.url).collect();
let mut obj = serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollectionPage","id":format!("{}?page={}",collection_id,p),"partOf":collection_id,"totalItems":total,"orderedItems":items}); let mut obj = serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollectionPage","id":format!("{}?page={}",collection_id,p),"partOf":collection_id,"totalItems":total,"orderedItems":items});
@@ -172,12 +221,12 @@ impl ActivityPubService {
pub async fn mark_follower_accepted(&self, user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { pub async fn mark_follower_accepted(&self, user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.federation_repo.update_follower_status(user_id, actor_url, crate::repository::FollowerStatus::Accepted).await.map_err(|e| anyhow::anyhow!("{e}")) data.follow_repo.update_follower_status(user_id, actor_url, crate::repository::FollowerStatus::Accepted).await.map_err(|e| anyhow::anyhow!("{e}"))
} }
pub async fn mark_follower_rejected(&self, user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { pub async fn mark_follower_rejected(&self, user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.federation_repo.remove_follower(user_id, actor_url).await.map_err(|e| anyhow::anyhow!("{e}")) data.follow_repo.remove_follower(user_id, actor_url).await.map_err(|e| anyhow::anyhow!("{e}"))
} }
pub async fn lookup_actor_by_handle(&self, handle: &str) -> anyhow::Result<crate::user::LookedUpActor> { pub async fn lookup_actor_by_handle(&self, handle: &str) -> anyhow::Result<crate::user::LookedUpActor> {
@@ -200,17 +249,17 @@ impl ActivityPubService {
pub async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> anyhow::Result<()> { pub async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.federation_repo.add_blocked_domain(domain, reason).await data.blocklist_repo.add_blocked_domain(domain, reason).await
} }
pub async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()> { pub async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.federation_repo.remove_blocked_domain(domain).await data.blocklist_repo.remove_blocked_domain(domain).await
} }
pub async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>> { pub async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>> {
let data = self.federation_config.to_request_data(); let data = self.federation_config.to_request_data();
data.federation_repo.get_blocked_domains().await data.blocklist_repo.get_blocked_domains().await
} }
// ── Private helpers (accessible to child modules via Rust's privacy rules) ─ // ── Private helpers (accessible to child modules via Rust's privacy rules) ─
@@ -221,7 +270,7 @@ impl ActivityPubService {
local_user_id: uuid::Uuid, local_user_id: uuid::Uuid,
) -> anyhow::Result<Option<(DbActor, Vec<Url>)>> { ) -> anyhow::Result<Option<(DbActor, Vec<Url>)>> {
let local_actor = get_local_actor(local_user_id, data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let local_actor = get_local_actor(local_user_id, data).await.map_err(|e| anyhow::anyhow!("{e}"))?;
let inbox_strs = data.federation_repo.get_accepted_follower_inboxes(local_user_id).await?; let inbox_strs = data.follow_repo.get_accepted_follower_inboxes(local_user_id).await?;
if inbox_strs.is_empty() { return Ok(None); } if inbox_strs.is_empty() { return Ok(None); }
let inboxes: Vec<Url> = inbox_strs.into_iter().filter_map(|s| { let inboxes: Vec<Url> = inbox_strs.into_iter().filter_map(|s| {
Url::parse(&s).map_err(|e| tracing::warn!(inbox = %s, error = %e, "skipping unparseable inbox URL")).ok() Url::parse(&s).map_err(|e| tracing::warn!(inbox = %s, error = %e, "skipping unparseable inbox URL")).ok()

View File

@@ -1,9 +1,5 @@
/// Integration tests exercising multi-component flows with in-memory trait stubs. // src/tests/integration.rs
/// /// Integration tests with in-memory trait stubs.
/// These tests don't spin up an HTTP server but they do exercise:
/// - `check_guards` idempotency (is_activity_processed → mark → duplicate rejected)
/// - `extract_and_dispatch_mentions` dispatches on_mention for local actors
/// - Multiple trait implementations wired through FederationData
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::Arc; use std::sync::Arc;
@@ -13,23 +9,23 @@ use chrono::{DateTime, Utc};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use url::Url; use url::Url;
use crate::content::ApObjectHandler; use crate::content::{ApContentReader, ApObjectHandler};
use crate::data::FederationData; use crate::data::FederationData;
use crate::repository::{ use crate::repository::{
BlockedDomain, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository,
Follower, FollowerStatus, FollowingStatus, FollowRepository, RemoteActor,
}; };
use crate::user::{ApActorType, ApProfileField, ApUser, ApUserRepository}; use crate::user::{ApActorType, ApProfileField, ApUser, ApUserRepository};
// ── In-memory FederationRepository ─────────────────────────────────────────── // ── ActivityRepository ────────────────────────────────────────────────────────
#[derive(Default)] #[derive(Default)]
struct MemRepo { struct MemActivityRepo {
processed: Mutex<HashSet<String>>, processed: Mutex<HashSet<String>>,
blocked_domains: Mutex<HashSet<String>>,
} }
#[async_trait] #[async_trait]
impl FederationRepository for MemRepo { impl ActivityRepository for MemActivityRepo {
async fn is_activity_processed(&self, id: &str) -> anyhow::Result<bool> { async fn is_activity_processed(&self, id: &str) -> anyhow::Result<bool> {
Ok(self.processed.lock().await.contains(id)) Ok(self.processed.lock().await.contains(id))
} }
@@ -37,44 +33,89 @@ impl FederationRepository for MemRepo {
self.processed.lock().await.insert(id.to_string()); self.processed.lock().await.insert(id.to_string());
Ok(()) Ok(())
} }
async fn is_domain_blocked(&self, domain: &str) -> anyhow::Result<bool> { }
Ok(self.blocked_domains.lock().await.contains(domain))
} // ── FollowRepository ──────────────────────────────────────────────────────────
// ── stubs ────────────────────────────────────────────────────────────────
#[derive(Default)]
struct MemFollowRepo;
#[async_trait]
impl FollowRepository for MemFollowRepo {
async fn add_follower(&self, _: uuid::Uuid, _: &str, _: FollowerStatus, _: &str) -> anyhow::Result<()> { Ok(()) } async fn add_follower(&self, _: uuid::Uuid, _: &str, _: FollowerStatus, _: &str) -> anyhow::Result<()> { Ok(()) }
async fn get_follower_follow_activity_id(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<Option<String>> { Ok(None) } async fn get_follower_follow_activity_id(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<Option<String>> { Ok(None) }
async fn remove_follower(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } async fn remove_follower(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) }
async fn get_followers(&self, _: uuid::Uuid) -> anyhow::Result<Vec<Follower>> { Ok(vec![]) } async fn get_followers(&self, _: uuid::Uuid) -> anyhow::Result<Vec<Follower>> { Ok(vec![]) }
async fn get_followers_page(&self, _: uuid::Uuid, _: u32, _: usize) -> anyhow::Result<Vec<Follower>> { Ok(vec![]) } async fn get_followers_page(&self, _: uuid::Uuid, _: u32, _: usize) -> anyhow::Result<Vec<Follower>> { Ok(vec![]) }
async fn count_followers(&self, _: uuid::Uuid) -> anyhow::Result<usize> { Ok(0) } async fn count_followers(&self, _: uuid::Uuid) -> anyhow::Result<usize> { Ok(0) }
async fn get_following_page(&self, _: uuid::Uuid, _: u32, _: usize) -> anyhow::Result<Vec<RemoteActor>> { Ok(vec![]) }
async fn update_follower_status(&self, _: uuid::Uuid, _: &str, _: FollowerStatus) -> anyhow::Result<()> { Ok(()) } async fn update_follower_status(&self, _: uuid::Uuid, _: &str, _: FollowerStatus) -> anyhow::Result<()> { Ok(()) }
async fn get_pending_followers(&self, _: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> { Ok(vec![]) }
async fn get_accepted_follower_inboxes(&self, _: uuid::Uuid) -> anyhow::Result<Vec<String>> { Ok(vec![]) }
async fn add_following(&self, _: uuid::Uuid, _: RemoteActor, _: &str) -> anyhow::Result<()> { Ok(()) } async fn add_following(&self, _: uuid::Uuid, _: RemoteActor, _: &str) -> anyhow::Result<()> { Ok(()) }
async fn get_follow_activity_id(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<Option<String>> { Ok(None) } async fn get_follow_activity_id(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<Option<String>> { Ok(None) }
async fn remove_following(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } async fn remove_following(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) }
async fn get_following(&self, _: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> { Ok(vec![]) } async fn get_following(&self, _: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> { Ok(vec![]) }
async fn get_following_page(&self, _: uuid::Uuid, _: u32, _: usize) -> anyhow::Result<Vec<RemoteActor>> { Ok(vec![]) }
async fn count_following(&self, _: uuid::Uuid) -> anyhow::Result<usize> { Ok(0) } async fn count_following(&self, _: uuid::Uuid) -> anyhow::Result<usize> { Ok(0) }
async fn upsert_remote_actor(&self, _: RemoteActor) -> anyhow::Result<()> { Ok(()) }
async fn get_remote_actor(&self, _: &str) -> anyhow::Result<Option<RemoteActor>> { Ok(None) }
async fn get_local_actor_keypair(&self, _: uuid::Uuid) -> anyhow::Result<Option<(String, String)>> { Ok(None) }
async fn save_local_actor_keypair(&self, _: uuid::Uuid, _: String, _: String) -> anyhow::Result<()> { Ok(()) }
async fn get_pending_followers(&self, _: uuid::Uuid) -> anyhow::Result<Vec<RemoteActor>> { Ok(vec![]) }
async fn update_following_status(&self, _: uuid::Uuid, _: &str, _: FollowingStatus) -> anyhow::Result<()> { Ok(()) } async fn update_following_status(&self, _: uuid::Uuid, _: &str, _: FollowingStatus) -> anyhow::Result<()> { Ok(()) }
async fn get_following_outbox_url(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<Option<String>> { Ok(None) } async fn get_following_outbox_url(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<Option<String>> { Ok(None) }
async fn migrate_follower_actor(&self, _: &str, _: &str) -> anyhow::Result<Vec<uuid::Uuid>> { Ok(vec![]) }
}
// ── ActorRepository ───────────────────────────────────────────────────────────
#[derive(Default)]
struct MemActorRepo;
#[async_trait]
impl ActorRepository for MemActorRepo {
async fn get_local_actor_keypair(&self, _: uuid::Uuid) -> anyhow::Result<Option<(String, String)>> { Ok(None) }
async fn save_local_actor_keypair(&self, _: uuid::Uuid, _: String, _: String) -> anyhow::Result<()> { Ok(()) }
async fn upsert_remote_actor(&self, _: RemoteActor) -> anyhow::Result<()> { Ok(()) }
async fn get_remote_actor(&self, _: &str) -> anyhow::Result<Option<RemoteActor>> { Ok(None) }
async fn add_announce(&self, _: &str, _: &str, _: &str, _: DateTime<Utc>) -> anyhow::Result<()> { Ok(()) } async fn add_announce(&self, _: &str, _: &str, _: &str, _: DateTime<Utc>) -> anyhow::Result<()> { Ok(()) }
async fn count_announces(&self, _: &str) -> anyhow::Result<usize> { Ok(0) } async fn count_announces(&self, _: &str) -> anyhow::Result<usize> { Ok(0) }
async fn add_blocked_domain(&self, _: &str, _: Option<&str>) -> anyhow::Result<()> { Ok(()) } }
async fn remove_blocked_domain(&self, _: &str) -> anyhow::Result<()> { Ok(()) }
// ── BlocklistRepository ───────────────────────────────────────────────────────
struct MemBlocklistRepo {
blocked_domains: Mutex<HashSet<String>>,
}
impl MemBlocklistRepo {
fn with_blocked_domains(domains: impl IntoIterator<Item = String>) -> Self {
Self { blocked_domains: Mutex::new(domains.into_iter().collect()) }
}
}
impl Default for MemBlocklistRepo {
fn default() -> Self {
Self { blocked_domains: Mutex::new(HashSet::new()) }
}
}
#[async_trait]
impl BlocklistRepository for MemBlocklistRepo {
async fn add_blocked_domain(&self, domain: &str, _: Option<&str>) -> anyhow::Result<()> {
self.blocked_domains.lock().await.insert(domain.to_string());
Ok(())
}
async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()> {
self.blocked_domains.lock().await.remove(domain);
Ok(())
}
async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>> { Ok(vec![]) } async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>> { Ok(vec![]) }
async fn is_domain_blocked(&self, domain: &str) -> anyhow::Result<bool> {
Ok(self.blocked_domains.lock().await.contains(domain))
}
async fn add_blocked_actor(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } async fn add_blocked_actor(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) }
async fn remove_blocked_actor(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } async fn remove_blocked_actor(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<()> { Ok(()) }
async fn get_blocked_actors(&self, _: uuid::Uuid) -> anyhow::Result<Vec<String>> { Ok(vec![]) } async fn get_blocked_actors(&self, _: uuid::Uuid) -> anyhow::Result<Vec<String>> { Ok(vec![]) }
async fn is_actor_blocked(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<bool> { Ok(false) } async fn is_actor_blocked(&self, _: uuid::Uuid, _: &str) -> anyhow::Result<bool> { Ok(false) }
async fn migrate_follower_actor(&self, _: &str, _: &str) -> anyhow::Result<Vec<uuid::Uuid>> { Ok(vec![]) }
async fn get_accepted_follower_inboxes(&self, _: uuid::Uuid) -> anyhow::Result<Vec<String>> { Ok(vec![]) }
} }
// ── In-memory ApUserRepository ──────────────────────────────────────────────── // ── ApUserRepository ──────────────────────────────────────────────────────────
struct MemUserRepo { struct MemUserRepo {
users: HashMap<uuid::Uuid, ApUser>, users: HashMap<uuid::Uuid, ApUser>,
@@ -113,7 +154,19 @@ impl ApUserRepository for MemUserRepo {
async fn count_users(&self) -> anyhow::Result<usize> { Ok(self.users.len()) } async fn count_users(&self) -> anyhow::Result<usize> { Ok(self.users.len()) }
} }
// ── In-memory ApObjectHandler ───────────────────────────────────────────────── // ── ApContentReader ───────────────────────────────────────────────────────────
#[derive(Default)]
struct MemContentReader;
#[async_trait]
impl ApContentReader for MemContentReader {
async fn get_local_objects_for_user(&self, _: uuid::Uuid) -> anyhow::Result<Vec<(Url, serde_json::Value)>> { Ok(vec![]) }
async fn get_local_objects_page(&self, _: uuid::Uuid, _: Option<DateTime<Utc>>, _: usize) -> anyhow::Result<Vec<(Url, serde_json::Value, DateTime<Utc>)>> { Ok(vec![]) }
async fn count_local_posts(&self) -> anyhow::Result<u64> { Ok(0) }
}
// ── ApObjectHandler ───────────────────────────────────────────────────────────
#[derive(Default)] #[derive(Default)]
struct MemHandler { struct MemHandler {
@@ -123,8 +176,6 @@ struct MemHandler {
#[async_trait] #[async_trait]
impl ApObjectHandler for MemHandler { impl ApObjectHandler for MemHandler {
async fn get_local_objects_for_user(&self, _: uuid::Uuid) -> anyhow::Result<Vec<(Url, serde_json::Value)>> { Ok(vec![]) }
async fn get_local_objects_page(&self, _: uuid::Uuid, _: Option<DateTime<Utc>>, _: usize) -> anyhow::Result<Vec<(Url, serde_json::Value, DateTime<Utc>)>> { Ok(vec![]) }
async fn on_create(&self, ap_id: &Url, _: &Url, _: serde_json::Value) -> anyhow::Result<()> { async fn on_create(&self, ap_id: &Url, _: &Url, _: serde_json::Value) -> anyhow::Result<()> {
self.creates.lock().await.push(ap_id.clone()); self.creates.lock().await.push(ap_id.clone());
Ok(()) Ok(())
@@ -133,26 +184,33 @@ impl ApObjectHandler for MemHandler {
async fn on_delete(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } async fn on_delete(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) }
async fn on_actor_removed(&self, _: &Url) -> anyhow::Result<()> { Ok(()) } async fn on_actor_removed(&self, _: &Url) -> anyhow::Result<()> { Ok(()) }
async fn on_like(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } async fn on_like(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) }
async fn on_announce_received(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) }
async fn on_unlike(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) } async fn on_unlike(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) }
async fn on_announce_received(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) }
async fn on_announce_of_remote(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) }
async fn on_mention(&self, ap_id: &Url, user_id: uuid::Uuid, _: &Url) -> anyhow::Result<()> { async fn on_mention(&self, ap_id: &Url, user_id: uuid::Uuid, _: &Url) -> anyhow::Result<()> {
self.mentions.lock().await.push((ap_id.clone(), user_id)); self.mentions.lock().await.push((ap_id.clone(), user_id));
Ok(()) Ok(())
} }
async fn on_announce_of_remote(&self, _: &Url, _: &Url) -> anyhow::Result<()> { Ok(()) }
async fn count_local_posts(&self) -> anyhow::Result<u64> { Ok(0) }
} }
// ── Helper ──────────────────────────────────────────────────────────────────── // ── Helper ────────────────────────────────────────────────────────────────────
fn make_data( fn make_data(
repo: Arc<MemRepo>, activity_repo: Arc<MemActivityRepo>,
follow_repo: Arc<MemFollowRepo>,
actor_repo: Arc<MemActorRepo>,
blocklist_repo: Arc<MemBlocklistRepo>,
user_repo: Arc<MemUserRepo>, user_repo: Arc<MemUserRepo>,
content_reader: Arc<MemContentReader>,
handler: Arc<MemHandler>, handler: Arc<MemHandler>,
) -> FederationData { ) -> FederationData {
FederationData::new( FederationData::new(
repo, activity_repo,
follow_repo,
actor_repo,
blocklist_repo,
user_repo, user_repo,
content_reader,
handler, handler,
"https://example.com".to_string(), "https://example.com".to_string(),
false, false,
@@ -165,14 +223,19 @@ fn make_data(
#[tokio::test] #[tokio::test]
async fn check_guards_idempotency() { async fn check_guards_idempotency() {
use crate::activities::helpers::{already_processed, check_guards}; use crate::activities::helpers::check_guards;
use activitypub_federation::config::FederationConfig; use activitypub_federation::config::FederationConfig;
let repo = Arc::new(MemRepo::default()); let activity_repo = Arc::new(MemActivityRepo::default());
let user_repo = Arc::new(MemUserRepo::with_user(uuid::Uuid::new_v4(), "alice")); let data_inner = make_data(
let handler = Arc::new(MemHandler::default()); activity_repo,
let data_inner = make_data(repo, user_repo, handler); Arc::new(MemFollowRepo),
Arc::new(MemActorRepo),
Arc::new(MemBlocklistRepo::default()),
Arc::new(MemUserRepo::with_user(uuid::Uuid::new_v4(), "alice")),
Arc::new(MemContentReader),
Arc::new(MemHandler::default()),
);
let config = FederationConfig::builder() let config = FederationConfig::builder()
.domain("example.com") .domain("example.com")
.app_data(data_inner) .app_data(data_inner)
@@ -185,15 +248,12 @@ async fn check_guards_idempotency() {
let activity_id: Url = "https://remote.example/activities/abc123".parse().unwrap(); let activity_id: Url = "https://remote.example/activities/abc123".parse().unwrap();
let actor: Url = "https://remote.example/users/bob".parse().unwrap(); let actor: Url = "https://remote.example/users/bob".parse().unwrap();
// First call: not processed yet → should NOT skip
let skip = check_guards(&activity_id, &actor, &data).await.unwrap(); let skip = check_guards(&activity_id, &actor, &data).await.unwrap();
assert!(!skip, "first delivery should not be skipped"); assert!(!skip, "first delivery should not be skipped");
// Second call with same activity ID → should skip (duplicate)
let skip = check_guards(&activity_id, &actor, &data).await.unwrap(); let skip = check_guards(&activity_id, &actor, &data).await.unwrap();
assert!(skip, "duplicate delivery should be skipped"); assert!(skip, "duplicate delivery should be skipped");
// Different activity ID → should not skip
let other_id: Url = "https://remote.example/activities/xyz999".parse().unwrap(); let other_id: Url = "https://remote.example/activities/xyz999".parse().unwrap();
let skip = check_guards(&other_id, &actor, &data).await.unwrap(); let skip = check_guards(&other_id, &actor, &data).await.unwrap();
assert!(!skip, "different activity should not be skipped"); assert!(!skip, "different activity should not be skipped");
@@ -204,14 +264,18 @@ async fn check_guards_blocks_domain() {
use crate::activities::helpers::check_guards; use crate::activities::helpers::check_guards;
use activitypub_federation::config::FederationConfig; use activitypub_federation::config::FederationConfig;
let repo = Arc::new(MemRepo { let blocklist_repo = Arc::new(MemBlocklistRepo::with_blocked_domains(
blocked_domains: Mutex::new(["spam.example".to_string()].into()), ["spam.example".to_string()],
..Default::default() ));
}); let data_inner = make_data(
let user_repo = Arc::new(MemUserRepo::with_user(uuid::Uuid::new_v4(), "alice")); Arc::new(MemActivityRepo::default()),
let handler = Arc::new(MemHandler::default()); Arc::new(MemFollowRepo),
let data_inner = make_data(repo, user_repo, handler); Arc::new(MemActorRepo),
blocklist_repo,
Arc::new(MemUserRepo::with_user(uuid::Uuid::new_v4(), "alice")),
Arc::new(MemContentReader),
Arc::new(MemHandler::default()),
);
let config = FederationConfig::builder() let config = FederationConfig::builder()
.domain("example.com") .domain("example.com")
.app_data(data_inner) .app_data(data_inner)
@@ -224,7 +288,6 @@ async fn check_guards_blocks_domain() {
let activity_id: Url = "https://spam.example/activities/1".parse().unwrap(); let activity_id: Url = "https://spam.example/activities/1".parse().unwrap();
let actor: Url = "https://spam.example/users/evil".parse().unwrap(); let actor: Url = "https://spam.example/users/evil".parse().unwrap();
// Blocked domain → should skip
let skip = check_guards(&activity_id, &actor, &data).await.unwrap(); let skip = check_guards(&activity_id, &actor, &data).await.unwrap();
assert!(skip, "activity from blocked domain should be skipped"); assert!(skip, "activity from blocked domain should be skipped");
} }
@@ -235,11 +298,16 @@ async fn extract_and_dispatch_mentions_notifies_local_users() {
use activitypub_federation::config::FederationConfig; use activitypub_federation::config::FederationConfig;
let local_user_id = uuid::Uuid::new_v4(); let local_user_id = uuid::Uuid::new_v4();
let user_repo = Arc::new(MemUserRepo::with_user(local_user_id, "alice"));
let handler = Arc::new(MemHandler::default()); let handler = Arc::new(MemHandler::default());
let repo = Arc::new(MemRepo::default()); let data_inner = make_data(
let data_inner = make_data(repo, user_repo.clone(), handler.clone()); Arc::new(MemActivityRepo::default()),
Arc::new(MemFollowRepo),
Arc::new(MemActorRepo),
Arc::new(MemBlocklistRepo::default()),
Arc::new(MemUserRepo::with_user(local_user_id, "alice")),
Arc::new(MemContentReader),
handler.clone(),
);
let config = FederationConfig::builder() let config = FederationConfig::builder()
.domain("example.com") .domain("example.com")
.app_data(data_inner) .app_data(data_inner)
@@ -251,16 +319,12 @@ async fn extract_and_dispatch_mentions_notifies_local_users() {
let ap_id: Url = "https://remote.example/notes/1".parse().unwrap(); let ap_id: Url = "https://remote.example/notes/1".parse().unwrap();
let actor_url: Url = "https://remote.example/users/bob".parse().unwrap(); let actor_url: Url = "https://remote.example/users/bob".parse().unwrap();
// Object with a Mention tag pointing to local user URL
let local_user_url = format!("https://example.com/users/{}", local_user_id); let local_user_url = format!("https://example.com/users/{}", local_user_id);
let object = serde_json::json!({ let object = serde_json::json!({
"type": "Note", "type": "Note",
"id": ap_id.as_str(), "id": ap_id.as_str(),
"content": "Hello @alice", "content": "Hello @alice",
"tag": [ "tag": [{"type": "Mention", "href": local_user_url}]
{"type": "Mention", "href": local_user_url, "name": "@alice@example.com"}
]
}); });
extract_and_dispatch_mentions(&ap_id, &actor_url, &object, &data).await; extract_and_dispatch_mentions(&ap_id, &actor_url, &object, &data).await;