# Federation Handler Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Replace the `FederationHandler` stub with a real implementation that fans out content events (ThoughtCreated/Deleted/Updated, BoostAdded) as ActivityPub activities, while simultaneously refactoring both worker handlers to be thin adapters over application-layer event services. **Architecture:** Domain defines `OutboundFederationPort`; application holds `FederationEventService` and `NotificationEventService` (business logic); `activitypub-base`'s `ActivityPubService` implements the port; worker handlers are one-liners that call the services. A new `worker/src/factory.rs` owns all dependency construction; `main.rs` stays tiny. **Dependency chain after refactor:** ``` domain ← application ← worker domain ← activitypub-base (impl OutboundFederationPort) bootstrap/worker → postgres, postgres-federation, activitypub, activitypub-base (composition roots only) ``` **Events handled in FederationHandler (async fan-out only):** - `ThoughtCreated` → `Create(Note)` to local-user followers (local thoughts only) - `ThoughtDeleted` → `Delete(Note)` to followers - `ThoughtUpdated` → `Update(Note)` to followers - `BoostAdded` → `Announce` to followers - All others → no-op (Follow/Accept/Reject/Block dispatched synchronously in HTTP handlers) --- ## File Map ``` Modify: crates/domain/src/ports.rs + OutboundFederationPort trait (4 methods) Create: crates/application/src/services/mod.rs Create: crates/application/src/services/notification_event.rs Create: crates/application/src/services/federation_event.rs Modify: crates/application/src/lib.rs + pub mod services Modify: crates/adapters/activitypub-base/src/activities.rs + to/cc fields on AnnounceActivity Modify: crates/adapters/activitypub-base/src/service.rs + broadcast_announce_to_followers() + impl OutboundFederationPort for ActivityPubService Modify: crates/worker/src/handlers.rs — remove all business logic, keep thin delegation wrappers Create: crates/worker/src/factory.rs + build() → builds all deps and returns (consumer, handlers) Modify: crates/worker/src/main.rs — call factory::build(), keep event loop only Modify: crates/worker/Cargo.toml + activitypub-base, activitypub, postgres-federation, application ``` --- ### Task 1: OutboundFederationPort in domain **Files:** - Modify: `crates/domain/src/ports.rs` - [ ] **Add `OutboundFederationPort` to `crates/domain/src/ports.rs`** — insert after the `ActivityPubRepository` trait: ```rust #[async_trait] pub trait OutboundFederationPort: Send + Sync { /// Fan out a new local Note to all accepted followers. async fn broadcast_create( &self, author_user_id: &UserId, thought: &Thought, author_username: &str, ) -> Result<(), DomainError>; /// Fan out a Delete tombstone for a now-deleted local Note. /// `thought_ap_id` is pre-constructed by the caller because the thought /// has already been deleted from the DB when this fires. async fn broadcast_delete( &self, author_user_id: &UserId, thought_ap_id: &str, ) -> Result<(), DomainError>; /// Fan out an Update(Note) for an edited local thought. async fn broadcast_update( &self, author_user_id: &UserId, thought: &Thought, author_username: &str, ) -> Result<(), DomainError>; /// Fan out an Announce(object_ap_id) for a boost. async fn broadcast_announce( &self, booster_user_id: &UserId, object_ap_id: &str, ) -> Result<(), DomainError>; } ``` - [ ] **Run:** `cargo check -p domain` — Expected: no errors. - [ ] **Commit:** ```bash git add crates/domain/src/ports.rs git commit -m "feat(domain): OutboundFederationPort — thin AP broadcast abstraction" ``` --- ### Task 2: NotificationEventService in application **Files:** - Create: `crates/application/src/services/mod.rs` - Create: `crates/application/src/services/notification_event.rs` - Modify: `crates/application/src/lib.rs` - [ ] **Write failing tests** at the bottom of `crates/application/src/services/notification_event.rs` (file doesn't exist yet — create it): ```rust #[cfg(test)] mod tests { use super::*; use domain::{ models::{thought::{Thought, Visibility}, user::User}, testing::TestStore, value_objects::*, }; use std::sync::Arc; fn alice() -> User { User::new_local( UserId::new(), Username::new("alice").unwrap(), Email::new("alice@ex.com").unwrap(), PasswordHash("h".into()), ) } #[tokio::test] async fn like_creates_notification_for_thought_author() { let store = TestStore::default(); let alice = alice(); let bob_id = UserId::new(); let thought = Thought::new_local( ThoughtId::new(), alice.id.clone(), Content::new_local("hello").unwrap(), None, Visibility::Public, None, false, ); store.thoughts.lock().unwrap().push(thought.clone()); let svc = NotificationEventService { thoughts: Arc::new(store.clone()), notifications: Arc::new(store.clone()), }; svc.process(&DomainEvent::LikeAdded { like_id: LikeId::new(), user_id: bob_id, thought_id: thought.id.clone(), }).await.unwrap(); let notifs = store.notifications.lock().unwrap(); assert_eq!(notifs.len(), 1); assert!(matches!(notifs[0].notification_type, NotificationType::Like)); } #[tokio::test] async fn self_like_creates_no_notification() { let store = TestStore::default(); let alice = alice(); let thought = Thought::new_local( ThoughtId::new(), alice.id.clone(), Content::new_local("hello").unwrap(), None, Visibility::Public, None, false, ); store.thoughts.lock().unwrap().push(thought.clone()); let svc = NotificationEventService { thoughts: Arc::new(store.clone()), notifications: Arc::new(store.clone()), }; svc.process(&DomainEvent::LikeAdded { like_id: LikeId::new(), user_id: alice.id.clone(), thought_id: thought.id.clone(), }).await.unwrap(); assert!(store.notifications.lock().unwrap().is_empty()); } #[tokio::test] async fn follow_accepted_creates_notification() { let store = TestStore::default(); let alice = alice(); let bob_id = UserId::new(); let svc = NotificationEventService { thoughts: Arc::new(store.clone()), notifications: Arc::new(store.clone()), }; svc.process(&DomainEvent::FollowAccepted { follower_id: bob_id, following_id: alice.id.clone(), }).await.unwrap(); let notifs = store.notifications.lock().unwrap(); assert_eq!(notifs.len(), 1); assert!(matches!(notifs[0].notification_type, NotificationType::Follow)); } #[tokio::test] async fn reply_creates_notification_for_original_author() { let store = TestStore::default(); let alice = alice(); let bob_id = UserId::new(); let original = Thought::new_local( ThoughtId::new(), alice.id.clone(), Content::new_local("original").unwrap(), None, Visibility::Public, None, false, ); store.thoughts.lock().unwrap().push(original.clone()); let svc = NotificationEventService { thoughts: Arc::new(store.clone()), notifications: Arc::new(store.clone()), }; svc.process(&DomainEvent::ThoughtCreated { thought_id: ThoughtId::new(), user_id: bob_id, in_reply_to_id: Some(original.id.clone()), }).await.unwrap(); let notifs = store.notifications.lock().unwrap(); assert_eq!(notifs.len(), 1); assert!(matches!(notifs[0].notification_type, NotificationType::Reply)); } #[tokio::test] async fn self_reply_creates_no_notification() { let store = TestStore::default(); let alice = alice(); let original = Thought::new_local( ThoughtId::new(), alice.id.clone(), Content::new_local("original").unwrap(), None, Visibility::Public, None, false, ); store.thoughts.lock().unwrap().push(original.clone()); let svc = NotificationEventService { thoughts: Arc::new(store.clone()), notifications: Arc::new(store.clone()), }; svc.process(&DomainEvent::ThoughtCreated { thought_id: ThoughtId::new(), user_id: alice.id.clone(), in_reply_to_id: Some(original.id.clone()), }).await.unwrap(); assert!(store.notifications.lock().unwrap().is_empty()); } } ``` - [ ] **Run:** `cargo test -p application` — Expected: FAIL (no implementation yet). - [ ] **Create `crates/application/src/services/notification_event.rs`:** ```rust use std::sync::Arc; use chrono::Utc; use domain::{ errors::DomainError, events::DomainEvent, models::notification::{Notification, NotificationType}, ports::{NotificationRepository, ThoughtRepository}, value_objects::NotificationId, }; pub struct NotificationEventService { pub thoughts: Arc, pub notifications: Arc, } impl NotificationEventService { pub async fn process(&self, event: &DomainEvent) -> Result<(), DomainError> { match event { DomainEvent::LikeAdded { like_id: _, user_id, thought_id } => { let thought = match self.thoughts.find_by_id(thought_id).await? { Some(t) => t, None => return Ok(()), }; if thought.user_id == *user_id { return Ok(()); } self.notifications.save(&Notification { id: NotificationId::new(), user_id: thought.user_id, notification_type: NotificationType::Like, from_user_id: Some(user_id.clone()), thought_id: Some(thought_id.clone()), read: false, created_at: Utc::now(), }).await } DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => { let thought = match self.thoughts.find_by_id(thought_id).await? { Some(t) => t, None => return Ok(()), }; if thought.user_id == *user_id { return Ok(()); } self.notifications.save(&Notification { id: NotificationId::new(), user_id: thought.user_id, notification_type: NotificationType::Boost, from_user_id: Some(user_id.clone()), thought_id: Some(thought_id.clone()), read: false, created_at: Utc::now(), }).await } DomainEvent::FollowAccepted { follower_id, following_id } => { self.notifications.save(&Notification { id: NotificationId::new(), user_id: following_id.clone(), notification_type: NotificationType::Follow, from_user_id: Some(follower_id.clone()), thought_id: None, read: false, created_at: Utc::now(), }).await } DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => { let reply_to_id = match in_reply_to_id { Some(id) => id, None => return Ok(()), }; let original = match self.thoughts.find_by_id(reply_to_id).await? { Some(t) => t, None => return Ok(()), }; if original.user_id == *user_id { return Ok(()); } self.notifications.save(&Notification { id: NotificationId::new(), user_id: original.user_id, notification_type: NotificationType::Reply, from_user_id: Some(user_id.clone()), thought_id: Some(thought_id.clone()), read: false, created_at: Utc::now(), }).await } _ => Ok(()), } } } ``` - [ ] **Create `crates/application/src/services/mod.rs`:** ```rust pub mod federation_event; pub mod notification_event; pub use federation_event::FederationEventService; pub use notification_event::NotificationEventService; ``` - [ ] **Modify `crates/application/src/lib.rs`** — add `pub mod services;`: ```rust pub mod services; pub mod use_cases; ``` - [ ] **Run:** `cargo test -p application` — Expected: 5 notification tests pass. - [ ] **Commit:** ```bash git add crates/application/ git commit -m "feat(application): NotificationEventService — move notification business logic out of worker" ``` --- ### Task 3: FederationEventService in application **Files:** - Create: `crates/application/src/services/federation_event.rs` - Modify: `crates/application/src/services/mod.rs` (re-export) - [ ] **Write failing tests** inside `crates/application/src/services/federation_event.rs`: ```rust #[cfg(test)] mod tests { use super::*; use async_trait::async_trait; use domain::{ errors::DomainError, events::DomainEvent, models::thought::{Thought, Visibility}, models::user::User, ports::OutboundFederationPort, testing::TestStore, value_objects::*, }; use std::sync::{Arc, Mutex}; // ── Spy port ───────────────────────────────────────────────────────────── #[derive(Default)] struct SpyPort { created: Mutex>, deleted: Mutex>, updated: Mutex>, announced: Mutex>, } #[async_trait] impl OutboundFederationPort for SpyPort { async fn broadcast_create(&self, _: &UserId, thought: &Thought, _: &str) -> Result<(), DomainError> { self.created.lock().unwrap().push(thought.id.clone()); Ok(()) } async fn broadcast_delete(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> { self.deleted.lock().unwrap().push(ap_id.to_string()); Ok(()) } async fn broadcast_update(&self, _: &UserId, thought: &Thought, _: &str) -> Result<(), DomainError> { self.updated.lock().unwrap().push(thought.id.clone()); Ok(()) } async fn broadcast_announce(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> { self.announced.lock().unwrap().push(ap_id.to_string()); Ok(()) } } fn alice() -> User { User::new_local( UserId::new(), Username::new("alice").unwrap(), Email::new("alice@ex.com").unwrap(), PasswordHash("h".into()), ) } fn local_thought(author_id: UserId) -> Thought { Thought::new_local( ThoughtId::new(), author_id, Content::new_local("hello").unwrap(), None, Visibility::Public, None, false, ) } fn svc(store: &TestStore, spy: Arc) -> FederationEventService { FederationEventService { thoughts: Arc::new(store.clone()), users: Arc::new(store.clone()), ap: spy, base_url: "https://example.com".to_string(), } } #[tokio::test] async fn thought_created_broadcasts_create() { let store = TestStore::default(); let alice = alice(); let thought = local_thought(alice.id.clone()); store.users.lock().unwrap().push(alice.clone()); store.thoughts.lock().unwrap().push(thought.clone()); let spy = Arc::new(SpyPort::default()); svc(&store, spy.clone()) .process(&DomainEvent::ThoughtCreated { thought_id: thought.id.clone(), user_id: alice.id.clone(), in_reply_to_id: None, }) .await .unwrap(); assert_eq!(spy.created.lock().unwrap().len(), 1); assert_eq!(spy.created.lock().unwrap()[0], thought.id); } #[tokio::test] async fn remote_thought_created_does_not_broadcast() { let store = TestStore::default(); let alice = alice(); // Remote thought: local = false, ap_id = Some(...) let mut thought = local_thought(alice.id.clone()); thought.local = false; thought.ap_id = Some("https://remote.example/notes/1".into()); store.users.lock().unwrap().push(alice.clone()); store.thoughts.lock().unwrap().push(thought.clone()); let spy = Arc::new(SpyPort::default()); svc(&store, spy.clone()) .process(&DomainEvent::ThoughtCreated { thought_id: thought.id.clone(), user_id: alice.id.clone(), in_reply_to_id: None, }) .await .unwrap(); assert!(spy.created.lock().unwrap().is_empty()); } #[tokio::test] async fn thought_deleted_broadcasts_delete_with_constructed_ap_id() { let store = TestStore::default(); let alice = alice(); let tid = ThoughtId::new(); let spy = Arc::new(SpyPort::default()); svc(&store, spy.clone()) .process(&DomainEvent::ThoughtDeleted { thought_id: tid.clone(), user_id: alice.id.clone(), }) .await .unwrap(); let deleted = spy.deleted.lock().unwrap(); assert_eq!(deleted.len(), 1); assert_eq!(deleted[0], format!("https://example.com/thoughts/{}", tid)); } #[tokio::test] async fn thought_updated_broadcasts_update() { let store = TestStore::default(); let alice = alice(); let thought = local_thought(alice.id.clone()); store.users.lock().unwrap().push(alice.clone()); store.thoughts.lock().unwrap().push(thought.clone()); let spy = Arc::new(SpyPort::default()); svc(&store, spy.clone()) .process(&DomainEvent::ThoughtUpdated { thought_id: thought.id.clone(), user_id: alice.id.clone(), }) .await .unwrap(); assert_eq!(spy.updated.lock().unwrap().len(), 1); } #[tokio::test] async fn boost_of_local_thought_announces_constructed_url() { let store = TestStore::default(); let alice = alice(); let thought = local_thought(alice.id.clone()); // ap_id = None store.thoughts.lock().unwrap().push(thought.clone()); let spy = Arc::new(SpyPort::default()); svc(&store, spy.clone()) .process(&DomainEvent::BoostAdded { boost_id: BoostId::new(), user_id: alice.id.clone(), thought_id: thought.id.clone(), }) .await .unwrap(); let announced = spy.announced.lock().unwrap(); assert_eq!(announced.len(), 1); assert_eq!(announced[0], format!("https://example.com/thoughts/{}", thought.id)); } #[tokio::test] async fn boost_of_remote_thought_announces_remote_ap_id() { let store = TestStore::default(); let alice = alice(); let mut thought = local_thought(alice.id.clone()); thought.local = false; thought.ap_id = Some("https://mastodon.social/users/bob/statuses/123".into()); store.thoughts.lock().unwrap().push(thought.clone()); let spy = Arc::new(SpyPort::default()); svc(&store, spy.clone()) .process(&DomainEvent::BoostAdded { boost_id: BoostId::new(), user_id: alice.id.clone(), thought_id: thought.id.clone(), }) .await .unwrap(); let announced = spy.announced.lock().unwrap(); assert_eq!(announced[0], "https://mastodon.social/users/bob/statuses/123"); } #[tokio::test] async fn unrelated_events_are_noop() { let store = TestStore::default(); let spy = Arc::new(SpyPort::default()); let svc = svc(&store, spy.clone()); svc.process(&DomainEvent::UserBlocked { blocker_id: UserId::new(), blocked_id: UserId::new(), }).await.unwrap(); assert!(spy.created.lock().unwrap().is_empty()); assert!(spy.deleted.lock().unwrap().is_empty()); assert!(spy.updated.lock().unwrap().is_empty()); assert!(spy.announced.lock().unwrap().is_empty()); } } ``` - [ ] **Run:** `cargo test -p application -- services::federation_event` — Expected: FAIL (no implementation). - [ ] **Write `crates/application/src/services/federation_event.rs`** — full file including tests already added above: ```rust use std::sync::Arc; use domain::{ errors::DomainError, events::DomainEvent, models::thought::Thought, ports::{OutboundFederationPort, ThoughtRepository, UserRepository}, value_objects::UserId, }; pub struct FederationEventService { pub thoughts: Arc, pub users: Arc, pub ap: Arc, pub base_url: String, } impl FederationEventService { pub async fn process(&self, event: &DomainEvent) -> Result<(), DomainError> { match event { DomainEvent::ThoughtCreated { thought_id, user_id, .. } => { let thought = match self.thoughts.find_by_id(thought_id).await? { Some(t) if t.local => t, _ => return Ok(()), }; let user = match self.users.find_by_id(user_id).await? { Some(u) => u, None => return Ok(()), }; self.ap.broadcast_create(user_id, &thought, user.username.as_str()).await } DomainEvent::ThoughtDeleted { thought_id, user_id } => { let ap_id = format!("{}/thoughts/{}", self.base_url, thought_id); self.ap.broadcast_delete(user_id, &ap_id).await } DomainEvent::ThoughtUpdated { thought_id, user_id } => { let thought = match self.thoughts.find_by_id(thought_id).await? { Some(t) if t.local => t, _ => return Ok(()), }; let user = match self.users.find_by_id(user_id).await? { Some(u) => u, None => return Ok(()), }; self.ap.broadcast_update(user_id, &thought, user.username.as_str()).await } DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => { let thought = match self.thoughts.find_by_id(thought_id).await? { Some(t) => t, None => return Ok(()), }; let object_ap_id = thought.ap_id.clone().unwrap_or_else(|| { format!("{}/thoughts/{}", self.base_url, thought_id) }); self.ap.broadcast_announce(user_id, &object_ap_id).await } _ => Ok(()), } } } ``` - [ ] **Update `crates/application/src/services/mod.rs`** to re-export both services: ```rust pub mod federation_event; pub mod notification_event; pub use federation_event::FederationEventService; pub use notification_event::NotificationEventService; ``` - [ ] **Run:** `cargo test -p application` — Expected: all 12 tests pass (5 notification + 7 federation). - [ ] **Commit:** ```bash git add crates/application/src/services/federation_event.rs crates/application/src/services/mod.rs git commit -m "feat(application): FederationEventService — content fan-out business logic" ``` --- ### Task 4: AnnounceActivity to/cc + impl OutboundFederationPort for ActivityPubService **Files:** - Modify: `crates/adapters/activitypub-base/src/activities.rs` - Modify: `crates/adapters/activitypub-base/src/service.rs` - [ ] **Add `to`/`cc` to `AnnounceActivity`** in `crates/adapters/activitypub-base/src/activities.rs` — replace the struct definition (fields only; leave `impl Activity` intact): ```rust #[derive(Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct AnnounceActivity { pub(crate) id: Url, #[serde(rename = "type", default)] pub(crate) kind: AnnounceType, pub(crate) actor: ObjectId, pub(crate) object: Url, pub(crate) published: Option>, #[serde(skip_serializing_if = "Vec::is_empty", default)] pub(crate) to: Vec, #[serde(skip_serializing_if = "Vec::is_empty", default)] pub(crate) cc: Vec, } ``` - [ ] **Run:** `cargo check -p activitypub-base` — Expected: no errors (fields are optional in deserialization due to `default`). - [ ] **Add `broadcast_announce_to_followers`** to `ActivityPubService` in `crates/adapters/activitypub-base/src/service.rs` — insert before the `follow` method: ```rust /// Fan out an Announce activity to all accepted followers. pub async fn broadcast_announce_to_followers( &self, local_user_id: uuid::Uuid, object_ap_id: url::Url, ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); let local_actor = get_local_actor(local_user_id, &data) .await .map_err(|e| anyhow::anyhow!("{e}"))?; let followers = data.federation_repo.get_followers(local_user_id).await?; let blocked = data.federation_repo.get_blocked_actors(local_user_id).await.unwrap_or_default(); let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); let blocked_domains = data.federation_repo.get_blocked_domains().await.unwrap_or_default(); let blocked_domain_set: std::collections::HashSet = blocked_domains.into_iter().map(|d| d.domain).collect(); let accepted: Vec<_> = followers .into_iter() .filter(|f| f.status == FollowerStatus::Accepted) .filter(|f| !blocked_set.contains(&f.actor.url)) .filter(|f| { let domain = url::Url::parse(&f.actor.inbox_url) .ok() .and_then(|u| u.host_str().map(|s| s.to_string())) .unwrap_or_default(); !blocked_domain_set.contains(&domain) }) .collect(); if accepted.is_empty() { return Ok(()); } let announce = AnnounceActivity { id: crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: activitypub_federation::fetch::object_id::ObjectId::from(local_actor.ap_id.clone()), object: object_ap_id, published: Some(chrono::Utc::now()), to: vec![crate::urls::AS_PUBLIC.to_string()], cc: vec![local_actor.followers_url.to_string()], }; let inboxes = collect_inboxes(&accepted); let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( &activitypub_federation::protocol::context::WithContext::new_default(announce), &local_actor, inboxes, &data, ) .await?; let failures = send_with_retry(sends, &data).await; if !failures.is_empty() { tracing::warn!(count = failures.len(), "some Announce deliveries failed"); } Ok(()) } ``` - [ ] **Add `impl OutboundFederationPort for ActivityPubService`** at the bottom of `crates/adapters/activitypub-base/src/service.rs`, after the existing `impl ActivityPubService` block: ```rust #[async_trait::async_trait] impl domain::ports::OutboundFederationPort for ActivityPubService { async fn broadcast_create( &self, author_user_id: &domain::value_objects::UserId, thought: &domain::models::thought::Thought, author_username: &str, ) -> Result<(), domain::errors::DomainError> { let user_uuid = author_user_id.as_uuid(); let data = self.federation_config.to_request_data(); let local_actor = get_local_actor(user_uuid, &data) .await .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; let ap_id = url::Url::parse(&format!("{}/thoughts/{}", self.base_url, thought.id)) .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; let mut note = serde_json::json!({ "type": "Note", "id": ap_id.to_string(), "attributedTo": local_actor.ap_id.to_string(), "content": thought.content.as_str(), "published": thought.created_at.to_rfc3339(), "to": [crate::urls::AS_PUBLIC], "cc": [local_actor.followers_url.to_string()], "sensitive": thought.sensitive, }); if let Some(ref cw) = thought.content_warning { note["summary"] = serde_json::json!(cw); } if let Some(ref reply_url) = thought.in_reply_to_url { note["inReplyTo"] = serde_json::json!(reply_url); } self.broadcast_to_followers(user_uuid, ap_id, note) .await .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) } async fn broadcast_delete( &self, author_user_id: &domain::value_objects::UserId, thought_ap_id: &str, ) -> Result<(), domain::errors::DomainError> { let user_uuid = author_user_id.as_uuid(); let ap_id = url::Url::parse(thought_ap_id) .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; self.broadcast_delete_to_followers(user_uuid, ap_id) .await .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) } async fn broadcast_update( &self, author_user_id: &domain::value_objects::UserId, thought: &domain::models::thought::Thought, author_username: &str, ) -> Result<(), domain::errors::DomainError> { let user_uuid = author_user_id.as_uuid(); let data = self.federation_config.to_request_data(); let local_actor = get_local_actor(user_uuid, &data) .await .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; let ap_id = format!("{}/thoughts/{}", self.base_url, thought.id); let mut note = serde_json::json!({ "type": "Note", "id": ap_id, "attributedTo": local_actor.ap_id.to_string(), "content": thought.content.as_str(), "published": thought.created_at.to_rfc3339(), "to": [crate::urls::AS_PUBLIC], "cc": [local_actor.followers_url.to_string()], "sensitive": thought.sensitive, }); if let Some(ref cw) = thought.content_warning { note["summary"] = serde_json::json!(cw); } if let Some(ref reply_url) = thought.in_reply_to_url { note["inReplyTo"] = serde_json::json!(reply_url); } self.broadcast_update_to_followers(user_uuid, note) .await .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) } async fn broadcast_announce( &self, booster_user_id: &domain::value_objects::UserId, object_ap_id: &str, ) -> Result<(), domain::errors::DomainError> { let user_uuid = booster_user_id.as_uuid(); let ap_id = url::Url::parse(object_ap_id) .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; self.broadcast_announce_to_followers(user_uuid, ap_id) .await .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) } } ``` - [ ] **Run:** `cargo check -p activitypub-base` — Expected: no errors. - [ ] **Run:** `cargo check --workspace` — Expected: no errors. - [ ] **Commit:** ```bash git add crates/adapters/activitypub-base/ git commit -m "feat(activitypub-base): Announce broadcast + impl OutboundFederationPort for ActivityPubService" ``` --- ### Task 5: Thin worker handlers + factory + main **Files:** - Modify: `crates/worker/Cargo.toml` - Modify: `crates/worker/src/handlers.rs` - Create: `crates/worker/src/factory.rs` - Modify: `crates/worker/src/main.rs` - [ ] **Update `crates/worker/Cargo.toml`** — add missing deps: ```toml [package] name = "worker" version = "0.1.0" edition = "2021" [[bin]] name = "thoughts-worker" path = "src/main.rs" [dependencies] domain = { workspace = true } application = { workspace = true } nats = { workspace = true } event-payload = { workspace = true } event-transport = { workspace = true } activitypub-base = { workspace = true } activitypub = { workspace = true } postgres = { workspace = true } postgres-federation = { workspace = true } async-nats = { workspace = true } tokio = { workspace = true, features = ["full"] } futures = { workspace = true } async-trait = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } dotenvy = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } sqlx = { workspace = true } [dev-dependencies] domain = { workspace = true, features = ["test-helpers"] } ``` - [ ] **Rewrite `crates/worker/src/handlers.rs`** — thin delegation wrappers only, all tests removed (they now live in `application`): ```rust use std::sync::Arc; use application::services::{FederationEventService, NotificationEventService}; use domain::{errors::DomainError, events::DomainEvent}; pub struct NotificationHandler { pub service: Arc, } impl NotificationHandler { pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { self.service.process(event).await } } pub struct FederationHandler { pub service: Arc, } impl FederationHandler { pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { self.service.process(event).await } } ``` - [ ] **Create `crates/worker/src/factory.rs`:** ```rust use std::sync::Arc; use sqlx::PgPool; use activitypub::ThoughtsObjectHandler; use activitypub_base::ActivityPubService; use application::services::{FederationEventService, NotificationEventService}; use postgres::activitypub::PgActivityPubRepository; use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; use crate::handlers::{FederationHandler, NotificationHandler}; pub struct WorkerHandlers { pub notification: NotificationHandler, pub federation: FederationHandler, } pub async fn build( database_url: &str, base_url: &str, nats_url: &str, ) -> ( event_transport::EventConsumerAdapter, WorkerHandlers, ) { let pool = PgPool::connect(database_url) .await .expect("DB connect failed"); // Repos let thoughts = Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())); let users = Arc::new(postgres::user::PgUserRepository::new(pool.clone())); let notifications = Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())); // ActivityPub service (for federation fan-out) let ap_service: Arc = Arc::new( ActivityPubService::new( Arc::new(PostgresFederationRepository::new(pool.clone())), Arc::new(PostgresApUserRepository::new(pool.clone(), base_url.to_string())), Arc::new(ThoughtsObjectHandler::new( Arc::new(PgActivityPubRepository::new(pool.clone())), base_url, )), base_url.to_string(), false, "thoughts".to_string(), false, None, ) .await .expect("ActivityPubService build failed"), ); // Application services let notification_svc = Arc::new(NotificationEventService { thoughts: thoughts.clone(), notifications, }); let federation_svc = Arc::new(FederationEventService { thoughts, users, ap: ap_service, base_url: base_url.to_string(), }); // Thin handlers let handlers = WorkerHandlers { notification: NotificationHandler { service: notification_svc }, federation: FederationHandler { service: federation_svc }, }; // NATS consumer let nats_client = async_nats::connect(nats_url) .await .expect("NATS connect failed"); let consumer = event_transport::EventConsumerAdapter::new( nats::NatsMessageSource::new(nats_client), ); (consumer, handlers) } ``` - [ ] **Rewrite `crates/worker/src/main.rs`:** ```rust mod factory; mod handlers; use futures::StreamExt; use domain::ports::EventConsumer; #[tokio::main] async fn main() { dotenvy::dotenv().ok(); tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .init(); let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required"); let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into()); let base_url = std::env::var("BASE_URL").expect("BASE_URL required"); tracing::info!("Building worker..."); let (consumer, handlers) = factory::build(&database_url, &base_url, &nats_url).await; tracing::info!("Worker started, consuming events..."); let mut stream = consumer.consume(); while let Some(result) = stream.next().await { match result { Ok(envelope) => { let event = &envelope.event; tracing::debug!(?event, "received event"); let n = handlers.notification.handle(event).await; let f = handlers.federation.handle(event).await; if n.is_ok() && f.is_ok() { (envelope.ack)(); } else { if let Err(e) = n { tracing::error!("notification handler: {e}"); } if let Err(e) = f { tracing::error!("federation handler: {e}"); } (envelope.nack)(); } } Err(e) => tracing::error!("consumer error: {e}"), } } } ``` - [ ] **Run:** `cargo check -p worker` — Expected: no errors. - [ ] **Run:** `cargo check --workspace` — Expected: no errors. - [ ] **Run full test suite:** ```bash DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -5 ``` Expected: all tests pass including the 12 new application service tests. - [ ] **Commit:** ```bash git add crates/worker/ git commit -m "refactor(worker): thin handlers + factory — move all business logic to application services" ``` --- ## Self-Review **Spec coverage:** - ✅ `OutboundFederationPort` in domain, 4 methods in domain language (Task 1) - ✅ `NotificationEventService` in application, business logic out of worker (Task 2) - ✅ 5 notification tests in application crate (Task 2) - ✅ `FederationEventService` in application: ThoughtCreated/Deleted/Updated/BoostAdded (Task 3) - ✅ Remote thought guard: `local == false` → skip broadcast (Task 3) - ✅ 7 federation event tests including remote thought guard and remote-boost AP ID (Task 3) - ✅ `to`/`cc` added to `AnnounceActivity` for AP compliance (Task 4) - ✅ `broadcast_announce_to_followers` respects blocked actors/domains (Task 4) - ✅ `impl OutboundFederationPort for ActivityPubService` builds Note JSON with `inReplyTo`, `summary`, `sensitive` (Task 4) - ✅ `worker/src/factory.rs` owns all composition — main.rs stays tiny (Task 5) - ✅ Worker handlers are one-liner delegations (Task 5) - ✅ Follow/Accept/Reject/Block remain synchronous in HTTP handlers — unchanged **Placeholder scan:** None. **Type consistency:** - `UserId::as_uuid()` used in impl — confirmed available in `value_objects.rs:11` - `Content::as_str()`, `Username::as_str()` — confirmed available - `Thought.local: bool` — used for guard in `FederationEventService` - `Thought.ap_id: Option` — used for boost AP ID construction - `ActivityPubService::broadcast_to_followers(uuid::Uuid, Url, Value)` — matches existing signature - `broadcast_update_to_followers(uuid::Uuid, Value)` — matches existing signature - `ThoughtsObjectHandler::new(Arc, &str)` — matches bootstrap factory usage - `PostgresApUserRepository::new(PgPool, String)` — matches bootstrap factory usage