diff --git a/crates/application/src/services/federation_event.rs b/crates/application/src/services/federation_event.rs new file mode 100644 index 0000000..8254083 --- /dev/null +++ b/crates/application/src/services/federation_event.rs @@ -0,0 +1,281 @@ +use std::sync::Arc; +use domain::{ + errors::DomainError, + events::DomainEvent, + ports::{OutboundFederationPort, ThoughtRepository, UserRepository}, +}; + +pub struct FederationEventService { + pub thoughts: Arc, + pub users: Arc, + pub ap: Arc, + pub base_url: String, +} + +impl FederationEventService { + pub async fn process(&self, event: &DomainEvent) -> Result<(), DomainError> { + match event { + DomainEvent::ThoughtCreated { thought_id, user_id, .. } => { + let thought = match self.thoughts.find_by_id(thought_id).await? { + Some(t) if t.local => t, + _ => return Ok(()), + }; + let user = match self.users.find_by_id(user_id).await? { + Some(u) => u, + None => return Ok(()), + }; + self.ap.broadcast_create(user_id, &thought, user.username.as_str()).await + } + + DomainEvent::ThoughtDeleted { thought_id, user_id } => { + let ap_id = format!("{}/thoughts/{}", self.base_url, thought_id); + self.ap.broadcast_delete(user_id, &ap_id).await + } + + DomainEvent::ThoughtUpdated { thought_id, user_id } => { + let thought = match self.thoughts.find_by_id(thought_id).await? { + Some(t) if t.local => t, + _ => return Ok(()), + }; + let user = match self.users.find_by_id(user_id).await? { + Some(u) => u, + None => return Ok(()), + }; + self.ap.broadcast_update(user_id, &thought, user.username.as_str()).await + } + + DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => { + let thought = match self.thoughts.find_by_id(thought_id).await? { + Some(t) => t, + None => return Ok(()), + }; + let object_ap_id = thought.ap_id.clone().unwrap_or_else(|| { + format!("{}/thoughts/{}", self.base_url, thought_id) + }); + self.ap.broadcast_announce(user_id, &object_ap_id).await + } + + _ => Ok(()), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + use domain::{ + errors::DomainError, + events::DomainEvent, + models::thought::{Thought, Visibility}, + models::user::User, + ports::OutboundFederationPort, + testing::TestStore, + value_objects::*, + }; + use std::sync::{Arc, Mutex}; + + // ── Spy port ───────────────────────────────────────────────────────────── + + #[derive(Default)] + struct SpyPort { + created: Mutex>, + deleted: Mutex>, + updated: Mutex>, + announced: Mutex>, + } + + #[async_trait] + impl OutboundFederationPort for SpyPort { + async fn broadcast_create(&self, _: &UserId, thought: &Thought, _: &str) -> Result<(), DomainError> { + self.created.lock().unwrap().push(thought.id.clone()); + Ok(()) + } + async fn broadcast_delete(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> { + self.deleted.lock().unwrap().push(ap_id.to_string()); + Ok(()) + } + async fn broadcast_update(&self, _: &UserId, thought: &Thought, _: &str) -> Result<(), DomainError> { + self.updated.lock().unwrap().push(thought.id.clone()); + Ok(()) + } + async fn broadcast_announce(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> { + self.announced.lock().unwrap().push(ap_id.to_string()); + Ok(()) + } + } + + fn alice() -> User { + User::new_local( + UserId::new(), + Username::new("alice").unwrap(), + Email::new("alice@ex.com").unwrap(), + PasswordHash("h".into()), + ) + } + + fn local_thought(author_id: UserId) -> Thought { + Thought::new_local( + ThoughtId::new(), author_id, + Content::new_local("hello").unwrap(), + None, Visibility::Public, None, false, + ) + } + + fn svc(store: &TestStore, spy: Arc) -> FederationEventService { + FederationEventService { + thoughts: Arc::new(store.clone()), + users: Arc::new(store.clone()), + ap: spy, + base_url: "https://example.com".to_string(), + } + } + + #[tokio::test] + async fn thought_created_broadcasts_create() { + let store = TestStore::default(); + let alice = alice(); + let thought = local_thought(alice.id.clone()); + store.users.lock().unwrap().push(alice.clone()); + store.thoughts.lock().unwrap().push(thought.clone()); + + let spy = Arc::new(SpyPort::default()); + svc(&store, spy.clone()) + .process(&DomainEvent::ThoughtCreated { + thought_id: thought.id.clone(), + user_id: alice.id.clone(), + in_reply_to_id: None, + }) + .await + .unwrap(); + + assert_eq!(spy.created.lock().unwrap().len(), 1); + assert_eq!(spy.created.lock().unwrap()[0], thought.id); + } + + #[tokio::test] + async fn remote_thought_created_does_not_broadcast() { + let store = TestStore::default(); + let alice = alice(); + // Remote thought: local = false, ap_id = Some(...) + let mut thought = local_thought(alice.id.clone()); + thought.local = false; + thought.ap_id = Some("https://remote.example/notes/1".into()); + store.users.lock().unwrap().push(alice.clone()); + store.thoughts.lock().unwrap().push(thought.clone()); + + let spy = Arc::new(SpyPort::default()); + svc(&store, spy.clone()) + .process(&DomainEvent::ThoughtCreated { + thought_id: thought.id.clone(), + user_id: alice.id.clone(), + in_reply_to_id: None, + }) + .await + .unwrap(); + + assert!(spy.created.lock().unwrap().is_empty()); + } + + #[tokio::test] + async fn thought_deleted_broadcasts_delete_with_constructed_ap_id() { + let store = TestStore::default(); + let alice = alice(); + let tid = ThoughtId::new(); + + let spy = Arc::new(SpyPort::default()); + svc(&store, spy.clone()) + .process(&DomainEvent::ThoughtDeleted { + thought_id: tid.clone(), + user_id: alice.id.clone(), + }) + .await + .unwrap(); + + let deleted = spy.deleted.lock().unwrap(); + assert_eq!(deleted.len(), 1); + assert_eq!(deleted[0], format!("https://example.com/thoughts/{}", tid)); + } + + #[tokio::test] + async fn thought_updated_broadcasts_update() { + let store = TestStore::default(); + let alice = alice(); + let thought = local_thought(alice.id.clone()); + store.users.lock().unwrap().push(alice.clone()); + store.thoughts.lock().unwrap().push(thought.clone()); + + let spy = Arc::new(SpyPort::default()); + svc(&store, spy.clone()) + .process(&DomainEvent::ThoughtUpdated { + thought_id: thought.id.clone(), + user_id: alice.id.clone(), + }) + .await + .unwrap(); + + assert_eq!(spy.updated.lock().unwrap().len(), 1); + } + + #[tokio::test] + async fn boost_of_local_thought_announces_constructed_url() { + let store = TestStore::default(); + let alice = alice(); + let thought = local_thought(alice.id.clone()); // ap_id = None + store.thoughts.lock().unwrap().push(thought.clone()); + + let spy = Arc::new(SpyPort::default()); + svc(&store, spy.clone()) + .process(&DomainEvent::BoostAdded { + boost_id: BoostId::new(), + user_id: alice.id.clone(), + thought_id: thought.id.clone(), + }) + .await + .unwrap(); + + let announced = spy.announced.lock().unwrap(); + assert_eq!(announced.len(), 1); + assert_eq!(announced[0], format!("https://example.com/thoughts/{}", thought.id)); + } + + #[tokio::test] + async fn boost_of_remote_thought_announces_remote_ap_id() { + let store = TestStore::default(); + let alice = alice(); + let mut thought = local_thought(alice.id.clone()); + thought.local = false; + thought.ap_id = Some("https://mastodon.social/users/bob/statuses/123".into()); + store.thoughts.lock().unwrap().push(thought.clone()); + + let spy = Arc::new(SpyPort::default()); + svc(&store, spy.clone()) + .process(&DomainEvent::BoostAdded { + boost_id: BoostId::new(), + user_id: alice.id.clone(), + thought_id: thought.id.clone(), + }) + .await + .unwrap(); + + let announced = spy.announced.lock().unwrap(); + assert_eq!(announced[0], "https://mastodon.social/users/bob/statuses/123"); + } + + #[tokio::test] + async fn unrelated_events_are_noop() { + let store = TestStore::default(); + let spy = Arc::new(SpyPort::default()); + let svc = svc(&store, spy.clone()); + + svc.process(&DomainEvent::UserBlocked { + blocker_id: UserId::new(), + blocked_id: UserId::new(), + }).await.unwrap(); + + assert!(spy.created.lock().unwrap().is_empty()); + assert!(spy.deleted.lock().unwrap().is_empty()); + assert!(spy.updated.lock().unwrap().is_empty()); + assert!(spy.announced.lock().unwrap().is_empty()); + } +} diff --git a/crates/application/src/services/mod.rs b/crates/application/src/services/mod.rs index 480fdc6..6116915 100644 --- a/crates/application/src/services/mod.rs +++ b/crates/application/src/services/mod.rs @@ -1,2 +1,5 @@ +pub mod federation_event; pub mod notification_event; + +pub use federation_event::FederationEventService; pub use notification_event::NotificationEventService;