feat: v2 rewrite — hexagonal arch, ActivityPub federation, NATS, deployment-ready #1
281
crates/application/src/services/federation_event.rs
Normal file
281
crates/application/src/services/federation_event.rs
Normal file
@@ -0,0 +1,281 @@
|
||||
use std::sync::Arc;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
ports::{OutboundFederationPort, ThoughtRepository, UserRepository},
|
||||
};
|
||||
|
||||
pub struct FederationEventService {
|
||||
pub thoughts: Arc<dyn ThoughtRepository>,
|
||||
pub users: Arc<dyn UserRepository>,
|
||||
pub ap: Arc<dyn OutboundFederationPort>,
|
||||
pub base_url: String,
|
||||
}
|
||||
|
||||
impl FederationEventService {
|
||||
pub async fn process(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
match event {
|
||||
DomainEvent::ThoughtCreated { thought_id, user_id, .. } => {
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) if t.local => t,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let user = match self.users.find_by_id(user_id).await? {
|
||||
Some(u) => u,
|
||||
None => return Ok(()),
|
||||
};
|
||||
self.ap.broadcast_create(user_id, &thought, user.username.as_str()).await
|
||||
}
|
||||
|
||||
DomainEvent::ThoughtDeleted { thought_id, user_id } => {
|
||||
let ap_id = format!("{}/thoughts/{}", self.base_url, thought_id);
|
||||
self.ap.broadcast_delete(user_id, &ap_id).await
|
||||
}
|
||||
|
||||
DomainEvent::ThoughtUpdated { thought_id, user_id } => {
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) if t.local => t,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let user = match self.users.find_by_id(user_id).await? {
|
||||
Some(u) => u,
|
||||
None => return Ok(()),
|
||||
};
|
||||
self.ap.broadcast_update(user_id, &thought, user.username.as_str()).await
|
||||
}
|
||||
|
||||
DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => {
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) => t,
|
||||
None => return Ok(()),
|
||||
};
|
||||
let object_ap_id = thought.ap_id.clone().unwrap_or_else(|| {
|
||||
format!("{}/thoughts/{}", self.base_url, thought_id)
|
||||
});
|
||||
self.ap.broadcast_announce(user_id, &object_ap_id).await
|
||||
}
|
||||
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use async_trait::async_trait;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
models::thought::{Thought, Visibility},
|
||||
models::user::User,
|
||||
ports::OutboundFederationPort,
|
||||
testing::TestStore,
|
||||
value_objects::*,
|
||||
};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
// ── Spy port ─────────────────────────────────────────────────────────────
|
||||
|
||||
#[derive(Default)]
|
||||
struct SpyPort {
|
||||
created: Mutex<Vec<ThoughtId>>,
|
||||
deleted: Mutex<Vec<String>>,
|
||||
updated: Mutex<Vec<ThoughtId>>,
|
||||
announced: Mutex<Vec<String>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl OutboundFederationPort for SpyPort {
|
||||
async fn broadcast_create(&self, _: &UserId, thought: &Thought, _: &str) -> Result<(), DomainError> {
|
||||
self.created.lock().unwrap().push(thought.id.clone());
|
||||
Ok(())
|
||||
}
|
||||
async fn broadcast_delete(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> {
|
||||
self.deleted.lock().unwrap().push(ap_id.to_string());
|
||||
Ok(())
|
||||
}
|
||||
async fn broadcast_update(&self, _: &UserId, thought: &Thought, _: &str) -> Result<(), DomainError> {
|
||||
self.updated.lock().unwrap().push(thought.id.clone());
|
||||
Ok(())
|
||||
}
|
||||
async fn broadcast_announce(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> {
|
||||
self.announced.lock().unwrap().push(ap_id.to_string());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn alice() -> User {
|
||||
User::new_local(
|
||||
UserId::new(),
|
||||
Username::new("alice").unwrap(),
|
||||
Email::new("alice@ex.com").unwrap(),
|
||||
PasswordHash("h".into()),
|
||||
)
|
||||
}
|
||||
|
||||
fn local_thought(author_id: UserId) -> Thought {
|
||||
Thought::new_local(
|
||||
ThoughtId::new(), author_id,
|
||||
Content::new_local("hello").unwrap(),
|
||||
None, Visibility::Public, None, false,
|
||||
)
|
||||
}
|
||||
|
||||
fn svc(store: &TestStore, spy: Arc<SpyPort>) -> FederationEventService {
|
||||
FederationEventService {
|
||||
thoughts: Arc::new(store.clone()),
|
||||
users: Arc::new(store.clone()),
|
||||
ap: spy,
|
||||
base_url: "https://example.com".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thought_created_broadcasts_create() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = local_thought(alice.id.clone());
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::ThoughtCreated {
|
||||
thought_id: thought.id.clone(),
|
||||
user_id: alice.id.clone(),
|
||||
in_reply_to_id: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(spy.created.lock().unwrap().len(), 1);
|
||||
assert_eq!(spy.created.lock().unwrap()[0], thought.id);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_thought_created_does_not_broadcast() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
// Remote thought: local = false, ap_id = Some(...)
|
||||
let mut thought = local_thought(alice.id.clone());
|
||||
thought.local = false;
|
||||
thought.ap_id = Some("https://remote.example/notes/1".into());
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::ThoughtCreated {
|
||||
thought_id: thought.id.clone(),
|
||||
user_id: alice.id.clone(),
|
||||
in_reply_to_id: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(spy.created.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thought_deleted_broadcasts_delete_with_constructed_ap_id() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let tid = ThoughtId::new();
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::ThoughtDeleted {
|
||||
thought_id: tid.clone(),
|
||||
user_id: alice.id.clone(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let deleted = spy.deleted.lock().unwrap();
|
||||
assert_eq!(deleted.len(), 1);
|
||||
assert_eq!(deleted[0], format!("https://example.com/thoughts/{}", tid));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thought_updated_broadcasts_update() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = local_thought(alice.id.clone());
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::ThoughtUpdated {
|
||||
thought_id: thought.id.clone(),
|
||||
user_id: alice.id.clone(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(spy.updated.lock().unwrap().len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn boost_of_local_thought_announces_constructed_url() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = local_thought(alice.id.clone()); // ap_id = None
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::BoostAdded {
|
||||
boost_id: BoostId::new(),
|
||||
user_id: alice.id.clone(),
|
||||
thought_id: thought.id.clone(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let announced = spy.announced.lock().unwrap();
|
||||
assert_eq!(announced.len(), 1);
|
||||
assert_eq!(announced[0], format!("https://example.com/thoughts/{}", thought.id));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn boost_of_remote_thought_announces_remote_ap_id() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let mut thought = local_thought(alice.id.clone());
|
||||
thought.local = false;
|
||||
thought.ap_id = Some("https://mastodon.social/users/bob/statuses/123".into());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::BoostAdded {
|
||||
boost_id: BoostId::new(),
|
||||
user_id: alice.id.clone(),
|
||||
thought_id: thought.id.clone(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let announced = spy.announced.lock().unwrap();
|
||||
assert_eq!(announced[0], "https://mastodon.social/users/bob/statuses/123");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unrelated_events_are_noop() {
|
||||
let store = TestStore::default();
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
let svc = svc(&store, spy.clone());
|
||||
|
||||
svc.process(&DomainEvent::UserBlocked {
|
||||
blocker_id: UserId::new(),
|
||||
blocked_id: UserId::new(),
|
||||
}).await.unwrap();
|
||||
|
||||
assert!(spy.created.lock().unwrap().is_empty());
|
||||
assert!(spy.deleted.lock().unwrap().is_empty());
|
||||
assert!(spy.updated.lock().unwrap().is_empty());
|
||||
assert!(spy.announced.lock().unwrap().is_empty());
|
||||
}
|
||||
}
|
||||
@@ -1,2 +1,5 @@
|
||||
pub mod federation_event;
|
||||
pub mod notification_event;
|
||||
|
||||
pub use federation_event::FederationEventService;
|
||||
pub use notification_event::NotificationEventService;
|
||||
|
||||
Reference in New Issue
Block a user