From 5a65fda0bc4e1b86e16c26b51f27252ca21caf51 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 4 Jun 2026 23:44:01 +0200 Subject: [PATCH] refactor: move federation port types from adapter to domain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ActivityPubRepository→FederationContentRepository, OutboundFederationPort→FederationBroadcastPort, ActorApUrls→ActorFederationUrls. Removes activitypub dep from application and presentation crates. Adapter re-exports old names as aliases for backward compat. Also fixes count_users test broken by instance actor migration. --- Cargo.lock | 2 - crates/adapters/activitypub/src/port.rs | 179 +----------------- .../adapters/postgres-federation/src/lib.rs | 3 +- crates/application/Cargo.toml | 1 - .../src/services/federation_event/mod.rs | 7 +- .../src/services/federation_event/tests.rs | 6 +- crates/application/src/testing.rs | 23 +-- .../use_cases/federation_management/mod.rs | 10 +- crates/domain/src/ports.rs | 133 +++++++++++++ crates/presentation/Cargo.toml | 1 - .../src/handlers/federation_actors/mod.rs | 4 +- crates/presentation/src/state.rs | 3 +- crates/presentation/src/testing.rs | 23 ++- 13 files changed, 180 insertions(+), 215 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 23816f6..4732bb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -273,7 +273,6 @@ dependencies = [ name = "application" version = "0.1.0" dependencies = [ - "activitypub", "async-trait", "bytes", "chrono", @@ -2605,7 +2604,6 @@ dependencies = [ name = "presentation" version = "0.1.0" dependencies = [ - "activitypub", "api-types", "application", "async-trait", diff --git a/crates/adapters/activitypub/src/port.rs b/crates/adapters/activitypub/src/port.rs index 909da62..35b29e0 100644 --- a/crates/adapters/activitypub/src/port.rs +++ b/crates/adapters/activitypub/src/port.rs @@ -1,176 +1,5 @@ -use async_trait::async_trait; -use domain::{ - errors::DomainError, - models::thought::Thought, - value_objects::{ThoughtId, UserId, Username}, +pub use domain::ports::{ + AcceptNoteInput, ActorFederationUrls as ActorApUrls, + FederationBroadcastPort as OutboundFederationPort, + FederationContentRepository as ActivityPubRepository, OutboxEntry, }; - -pub struct AcceptNoteInput<'a> { - pub ap_id: &'a str, - pub author_id: &'a UserId, - pub content: &'a str, - pub published: chrono::DateTime, - pub sensitive: bool, - pub content_warning: Option, - pub visibility: &'a str, - pub in_reply_to: Option<&'a str>, - pub note_extensions: Option, -} - -/// AP-protocol endpoints for a locally-stored user (local or interned remote). -#[derive(Debug, Clone)] -pub struct ActorApUrls { - pub ap_id: String, - pub inbox_url: String, -} - -/// A local thought ready for AP serialization, with the author's username -/// pre-joined so the handler can build AP URLs without a second query. -#[derive(Debug, Clone)] -pub struct OutboxEntry { - pub thought: Thought, - pub author_username: Username, -} - -#[async_trait] -pub trait ActivityPubRepository: Send + Sync { - // ── Outbox (local → remote) ────────────────────────────────────── - - /// All public local thoughts for this actor. Used for outbox totals - /// and full-collection delivery. - async fn outbox_entries_for_actor( - &self, - user_id: &UserId, - ) -> Result, DomainError>; - - /// Cursor page of public local thoughts, newest-first, before `before`. - /// Used for OrderedCollectionPage responses. - async fn outbox_page_for_actor( - &self, - user_id: &UserId, - before: Option>, - limit: usize, - ) -> Result, DomainError>; - - // ── Remote actor resolution ────────────────────────────────────── - - /// Find the local UserId for a remote actor by its AP URL. - async fn find_remote_actor_id(&self, actor_ap_url: &str) - -> Result, DomainError>; - - /// Ensure a remote actor placeholder exists; create one if absent. - /// Idempotent — safe to call multiple times with the same URL. - async fn intern_remote_actor(&self, actor_ap_url: &str) -> Result; - - /// Update display_name and avatar_url for an already-interned remote actor. - async fn update_remote_actor_display( - &self, - user_id: &UserId, - display_name: Option<&str>, - avatar_url: Option<&str>, - ) -> Result<(), DomainError>; - - // ── Inbox processing (remote → local) ─────────────────────────── - - /// Persist an incoming remote Note. Idempotent on ap_id. - async fn accept_note(&self, input: AcceptNoteInput<'_>) -> Result; - - /// Apply an Update to a previously accepted remote Note. - async fn apply_note_update( - &self, - ap_id: &str, - new_content: &str, - note_extensions: Option, - ) -> Result<(), DomainError>; - - /// Remove a specific remote Note (Delete activity). Only touches - /// remotely-originated thoughts. - async fn retract_note(&self, ap_id: &str) -> Result<(), DomainError>; - - /// Remove all Notes from a remote actor (actor-level Delete/Tombstone). - async fn retract_actor_notes(&self, actor_ap_url: &str) -> Result<(), DomainError>; - - // ── Node-level stats ───────────────────────────────────────────── - - /// Total locally-authored thought count for NodeInfo responses. - async fn count_local_notes(&self) -> Result; - - /// Return the ActivityPub object URL for a thought, if one is stored. - /// Returns None for local thoughts (caller constructs URL from base_url + thought_id). - async fn get_thought_ap_id( - &self, - thought_id: &ThoughtId, - ) -> Result, DomainError>; - - /// Return the AP actor URL and inbox URL for a user, if stored. - /// Returns None for users that have not been federated. - async fn get_actor_ap_urls(&self, user_id: &UserId) - -> Result, DomainError>; - - /// Sync display_name + avatar_url from remote_actors to users table. - async fn sync_remote_actor_to_user(&self, actor_ap_url: &str) -> Result<(), DomainError>; -} - -#[async_trait] -pub trait OutboundFederationPort: Send + Sync { - /// Fan out a new local Note to all accepted followers. - async fn broadcast_create( - &self, - author_user_id: &UserId, - thought: &Thought, - author_username: &str, - in_reply_to_url: Option<&str>, - ) -> Result<(), DomainError>; - - /// Fan out a Delete tombstone for a now-deleted local Note. - /// `thought_ap_id` is pre-constructed by the caller because the thought - /// has already been deleted from the DB when this fires. - async fn broadcast_delete( - &self, - author_user_id: &UserId, - thought_ap_id: &str, - ) -> Result<(), DomainError>; - - /// Fan out an Update(Note) for an edited local thought. - async fn broadcast_update( - &self, - author_user_id: &UserId, - thought: &Thought, - author_username: &str, - in_reply_to_url: Option<&str>, - ) -> Result<(), DomainError>; - - /// Fan out an Announce(object_ap_id) for a boost. - async fn broadcast_announce( - &self, - booster_user_id: &UserId, - object_ap_id: &str, - ) -> Result<(), DomainError>; - - /// Fan out an Undo(Announce) to followers when a boost is removed. - async fn broadcast_undo_announce( - &self, - booster_user_id: &UserId, - object_ap_id: &str, - ) -> Result<(), DomainError>; - - /// Send a Like activity to a remote thought author's inbox. - /// Only called when a LOCAL user likes a REMOTE thought (one with an ap_id). - async fn broadcast_like( - &self, - liker_user_id: &UserId, - object_ap_id: &str, - author_inbox_url: &str, - ) -> Result<(), DomainError>; - - /// Send Undo(Like) to a remote thought author's inbox. - async fn broadcast_undo_like( - &self, - liker_user_id: &UserId, - object_ap_id: &str, - author_inbox_url: &str, - ) -> Result<(), DomainError>; - - /// Fan out an Update(Actor) to all accepted followers after a profile change. - async fn broadcast_actor_update(&self, user_id: &UserId) -> Result<(), DomainError>; -} diff --git a/crates/adapters/postgres-federation/src/lib.rs b/crates/adapters/postgres-federation/src/lib.rs index e4fdca0..e197044 100644 --- a/crates/adapters/postgres-federation/src/lib.rs +++ b/crates/adapters/postgres-federation/src/lib.rs @@ -919,6 +919,7 @@ mod tests { .unwrap(); let repo = PgApUserRepository::new(pool, "https://example.com".into()); - assert_eq!(repo.count_users().await.unwrap(), 2); + // 2 seeded local users + 1 instance actor from migration 022 + assert_eq!(repo.count_users().await.unwrap(), 3); } } diff --git a/crates/application/Cargo.toml b/crates/application/Cargo.toml index 8320bb1..7095534 100644 --- a/crates/application/Cargo.toml +++ b/crates/application/Cargo.toml @@ -5,7 +5,6 @@ edition = "2021" [dependencies] domain = { workspace = true } -activitypub = { workspace = true } async-trait = { workspace = true } thiserror = { workspace = true } uuid = { workspace = true } diff --git a/crates/application/src/services/federation_event/mod.rs b/crates/application/src/services/federation_event/mod.rs index 4383ea6..a86117e 100644 --- a/crates/application/src/services/federation_event/mod.rs +++ b/crates/application/src/services/federation_event/mod.rs @@ -1,9 +1,8 @@ -use activitypub::{ActivityPubRepository, OutboundFederationPort}; use domain::{ errors::DomainError, events::DomainEvent, models::thought::Visibility, - ports::{ThoughtRepository, UserReader}, + ports::{FederationBroadcastPort, FederationContentRepository, ThoughtRepository, UserReader}, value_objects::ThoughtId, }; use std::sync::Arc; @@ -15,9 +14,9 @@ fn should_broadcast(t: &domain::models::thought::Thought) -> bool { pub struct FederationEventService { pub thoughts: Arc, pub users: Arc, - pub ap: Arc, + pub ap: Arc, pub base_url: String, - pub ap_repo: Arc, + pub ap_repo: Arc, } impl FederationEventService { diff --git a/crates/application/src/services/federation_event/tests.rs b/crates/application/src/services/federation_event/tests.rs index 9f184b9..03886b1 100644 --- a/crates/application/src/services/federation_event/tests.rs +++ b/crates/application/src/services/federation_event/tests.rs @@ -1,7 +1,7 @@ use super::*; use crate::testing::TestApRepo; -use activitypub::{ActorApUrls, OutboundFederationPort}; use async_trait::async_trait; +use domain::ports::{ActorFederationUrls, FederationBroadcastPort}; use domain::{ errors::DomainError, events::DomainEvent, @@ -27,7 +27,7 @@ struct SpyPort { } #[async_trait] -impl OutboundFederationPort for SpyPort { +impl FederationBroadcastPort for SpyPort { async fn broadcast_create( &self, _: &UserId, @@ -482,7 +482,7 @@ async fn like_added_local_user_remote_thought_broadcasts_like() { let ap_repo = TestApRepo::new(store.clone()); ap_repo.actor_ap_urls.lock().unwrap().insert( author.id.clone(), - ActorApUrls { + ActorFederationUrls { ap_id: "https://mastodon.social/users/author".into(), inbox_url: "https://mastodon.social/users/author/inbox".into(), }, diff --git a/crates/application/src/testing.rs b/crates/application/src/testing.rs index ecae317..c143d70 100644 --- a/crates/application/src/testing.rs +++ b/crates/application/src/testing.rs @@ -1,9 +1,8 @@ -/// Test helpers for application-layer tests that need activitypub traits. -use activitypub::{ActivityPubRepository, ActorApUrls, OutboxEntry}; use async_trait::async_trait; use domain::{ errors::DomainError, models::user::User, + ports::{AcceptNoteInput, ActorFederationUrls, FederationContentRepository, OutboxEntry}, testing::TestStore, value_objects::{Email, ThoughtId, UserId, Username}, }; @@ -14,8 +13,8 @@ use std::sync::{Arc, Mutex}; #[derive(Default, Clone)] pub struct TestApRepo { pub inner: TestStore, - /// UserId → ActorApUrls (for get_actor_ap_urls) - pub actor_ap_urls: Arc>>, + /// UserId → ActorFederationUrls (for get_actor_ap_urls) + pub actor_ap_urls: Arc>>, } impl TestApRepo { @@ -28,7 +27,7 @@ impl TestApRepo { } #[async_trait] -impl ActivityPubRepository for TestApRepo { +impl FederationContentRepository for TestApRepo { async fn outbox_entries_for_actor( &self, _uid: &UserId, @@ -84,13 +83,15 @@ impl ActivityPubRepository for TestApRepo { ) -> Result<(), DomainError> { Ok(()) } - async fn accept_note( - &self, - _input: activitypub::AcceptNoteInput<'_>, - ) -> Result { + async fn accept_note(&self, _input: AcceptNoteInput<'_>) -> Result { Ok(ThoughtId::from_uuid(uuid::Uuid::new_v4())) } - async fn apply_note_update(&self, _ap_id: &str, _new_content: &str, _: Option) -> Result<(), DomainError> { + async fn apply_note_update( + &self, + _ap_id: &str, + _new_content: &str, + _: Option, + ) -> Result<(), DomainError> { Ok(()) } async fn retract_note(&self, _ap_id: &str) -> Result<(), DomainError> { @@ -124,7 +125,7 @@ impl ActivityPubRepository for TestApRepo { async fn get_actor_ap_urls( &self, user_id: &UserId, - ) -> Result, DomainError> { + ) -> Result, DomainError> { Ok(self.actor_ap_urls.lock().unwrap().get(user_id).cloned()) } async fn sync_remote_actor_to_user(&self, _actor_ap_url: &str) -> Result<(), DomainError> { diff --git a/crates/application/src/use_cases/federation_management/mod.rs b/crates/application/src/use_cases/federation_management/mod.rs index 7a5f963..9c4f967 100644 --- a/crates/application/src/use_cases/federation_management/mod.rs +++ b/crates/application/src/use_cases/federation_management/mod.rs @@ -1,4 +1,3 @@ -use activitypub::ActivityPubRepository; use domain::{ errors::DomainError, events::DomainEvent, @@ -8,9 +7,10 @@ use domain::{ remote_actor::RemoteActor, }, ports::{ - EventPublisher, FederationActionPort, FederationFollowPort, FederationFollowRequestPort, - FederationSchedulerPort, FeedOptions, FeedQuery, FeedRepository, FeedRequest, - FollowRepository, RemoteActorConnectionRepository, UserReader, UserWriter, + EventPublisher, FederationActionPort, FederationContentRepository, FederationFollowPort, + FederationFollowRequestPort, FederationSchedulerPort, FeedOptions, FeedQuery, + FeedRepository, FeedRequest, FollowRepository, RemoteActorConnectionRepository, UserReader, + UserWriter, }, value_objects::UserId, }; @@ -119,7 +119,7 @@ pub async fn remove_remote_following( pub async fn get_remote_actor_posts( federation: &dyn FederationActionPort, - ap_repo: &dyn ActivityPubRepository, + ap_repo: &dyn FederationContentRepository, feed: &dyn FeedRepository, scheduler: &dyn FederationSchedulerPort, handle: &str, diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index ac10335..4bcfb9a 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -511,3 +511,136 @@ pub trait FederationSchedulerPort: Send + Sync { page: u32, ) -> Result<(), DomainError>; } + +// ── Federation content & broadcast ports ──────────────────────────────── + +pub struct AcceptNoteInput<'a> { + pub ap_id: &'a str, + pub author_id: &'a UserId, + pub content: &'a str, + pub published: chrono::DateTime, + pub sensitive: bool, + pub content_warning: Option, + pub visibility: &'a str, + pub in_reply_to: Option<&'a str>, + pub note_extensions: Option, +} + +#[derive(Debug, Clone)] +pub struct ActorFederationUrls { + pub ap_id: String, + pub inbox_url: String, +} + +#[derive(Debug, Clone)] +pub struct OutboxEntry { + pub thought: Thought, + pub author_username: Username, +} + +#[async_trait] +pub trait FederationContentRepository: Send + Sync { + async fn outbox_entries_for_actor( + &self, + user_id: &UserId, + ) -> Result, DomainError>; + + async fn outbox_page_for_actor( + &self, + user_id: &UserId, + before: Option>, + limit: usize, + ) -> Result, DomainError>; + + async fn find_remote_actor_id(&self, actor_ap_url: &str) + -> Result, DomainError>; + + async fn intern_remote_actor(&self, actor_ap_url: &str) -> Result; + + async fn update_remote_actor_display( + &self, + user_id: &UserId, + display_name: Option<&str>, + avatar_url: Option<&str>, + ) -> Result<(), DomainError>; + + async fn accept_note(&self, input: AcceptNoteInput<'_>) -> Result; + + async fn apply_note_update( + &self, + ap_id: &str, + new_content: &str, + note_extensions: Option, + ) -> Result<(), DomainError>; + + async fn retract_note(&self, ap_id: &str) -> Result<(), DomainError>; + + async fn retract_actor_notes(&self, actor_ap_url: &str) -> Result<(), DomainError>; + + async fn count_local_notes(&self) -> Result; + + async fn get_thought_ap_id( + &self, + thought_id: &ThoughtId, + ) -> Result, DomainError>; + + async fn get_actor_ap_urls( + &self, + user_id: &UserId, + ) -> Result, DomainError>; + + async fn sync_remote_actor_to_user(&self, actor_ap_url: &str) -> Result<(), DomainError>; +} + +#[async_trait] +pub trait FederationBroadcastPort: Send + Sync { + async fn broadcast_create( + &self, + author_user_id: &UserId, + thought: &Thought, + author_username: &str, + in_reply_to_url: Option<&str>, + ) -> Result<(), DomainError>; + + async fn broadcast_delete( + &self, + author_user_id: &UserId, + thought_ap_id: &str, + ) -> Result<(), DomainError>; + + async fn broadcast_update( + &self, + author_user_id: &UserId, + thought: &Thought, + author_username: &str, + in_reply_to_url: Option<&str>, + ) -> Result<(), DomainError>; + + async fn broadcast_announce( + &self, + booster_user_id: &UserId, + object_ap_id: &str, + ) -> Result<(), DomainError>; + + async fn broadcast_undo_announce( + &self, + booster_user_id: &UserId, + object_ap_id: &str, + ) -> Result<(), DomainError>; + + async fn broadcast_like( + &self, + liker_user_id: &UserId, + object_ap_id: &str, + author_inbox_url: &str, + ) -> Result<(), DomainError>; + + async fn broadcast_undo_like( + &self, + liker_user_id: &UserId, + object_ap_id: &str, + author_inbox_url: &str, + ) -> Result<(), DomainError>; + + async fn broadcast_actor_update(&self, user_id: &UserId) -> Result<(), DomainError>; +} diff --git a/crates/presentation/Cargo.toml b/crates/presentation/Cargo.toml index de07473..8befea7 100644 --- a/crates/presentation/Cargo.toml +++ b/crates/presentation/Cargo.toml @@ -5,7 +5,6 @@ edition = "2021" [dependencies] domain = { workspace = true } -activitypub = { workspace = true } application = { workspace = true } api-types = { workspace = true } axum = { workspace = true } diff --git a/crates/presentation/src/handlers/federation_actors/mod.rs b/crates/presentation/src/handlers/federation_actors/mod.rs index 68225d6..8833385 100644 --- a/crates/presentation/src/handlers/federation_actors/mod.rs +++ b/crates/presentation/src/handlers/federation_actors/mod.rs @@ -4,7 +4,6 @@ use crate::{ handlers::feed::to_thought_response, state::AppState, }; -use activitypub::ActivityPubRepository; use api_types::{ requests::PaginationQuery, responses::{ @@ -18,6 +17,7 @@ use axum::{ extract::{Path, Query}, Json, }; +use domain::ports::FederationContentRepository; use domain::{ models::feed::PageParams, ports::{ @@ -29,7 +29,7 @@ use std::sync::Arc; pub struct FederationActorsDeps { pub federation: Arc, - pub ap_repo: Arc, + pub ap_repo: Arc, pub feed: Arc, pub federation_scheduler: Arc, pub remote_actor_connections: Arc, diff --git a/crates/presentation/src/state.rs b/crates/presentation/src/state.rs index 6dec547..1e6b853 100644 --- a/crates/presentation/src/state.rs +++ b/crates/presentation/src/state.rs @@ -1,4 +1,3 @@ -use activitypub::ActivityPubRepository; use application::use_cases::profile::UploadConfig; use domain::ports::*; use std::sync::Arc; @@ -24,7 +23,7 @@ pub struct AppState { pub events: Arc, pub outbox: Arc, pub federation: Arc, - pub ap_repo: Arc, + pub ap_repo: Arc, pub remote_actor_connections: Arc, pub federation_scheduler: Arc, pub engagement: Arc, diff --git a/crates/presentation/src/testing.rs b/crates/presentation/src/testing.rs index 82be4fe..068da12 100644 --- a/crates/presentation/src/testing.rs +++ b/crates/presentation/src/testing.rs @@ -1,7 +1,9 @@ use crate::state::AppState; -use activitypub::{ActivityPubRepository, ActorApUrls, OutboxEntry}; use application::use_cases::profile::UploadConfig; use async_trait::async_trait; +use domain::ports::{ + AcceptNoteInput, ActorFederationUrls, FederationContentRepository, OutboxEntry, +}; use domain::{ errors::DomainError, ports::{AuthService, DataStream, GeneratedToken, MediaStore, PasswordHasher}, @@ -34,7 +36,7 @@ impl PasswordHasher for NoOpHasher { pub struct NoOpApRepo; #[async_trait] -impl ActivityPubRepository for NoOpApRepo { +impl FederationContentRepository for NoOpApRepo { async fn outbox_entries_for_actor(&self, _: &UserId) -> Result, DomainError> { Ok(vec![]) } @@ -60,13 +62,15 @@ impl ActivityPubRepository for NoOpApRepo { ) -> Result<(), DomainError> { Ok(()) } - async fn accept_note( - &self, - _: activitypub::AcceptNoteInput<'_>, - ) -> Result { + async fn accept_note(&self, _: AcceptNoteInput<'_>) -> Result { Ok(ThoughtId::from_uuid(uuid::Uuid::new_v4())) } - async fn apply_note_update(&self, _: &str, _: &str, _: Option) -> Result<(), DomainError> { + async fn apply_note_update( + &self, + _: &str, + _: &str, + _: Option, + ) -> Result<(), DomainError> { Ok(()) } async fn retract_note(&self, _: &str) -> Result<(), DomainError> { @@ -81,7 +85,10 @@ impl ActivityPubRepository for NoOpApRepo { async fn get_thought_ap_id(&self, _: &ThoughtId) -> Result, DomainError> { Ok(None) } - async fn get_actor_ap_urls(&self, _: &UserId) -> Result, DomainError> { + async fn get_actor_ap_urls( + &self, + _: &UserId, + ) -> Result, DomainError> { Ok(None) } async fn sync_remote_actor_to_user(&self, _: &str) -> Result<(), DomainError> {