Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 9m13s
test / unit (pull_request) Successful in 15m56s
test / integration (pull_request) Failing after 17m29s
675 lines
22 KiB
Rust
675 lines
22 KiB
Rust
use domain::{
|
|
errors::DomainError,
|
|
events::DomainEvent,
|
|
models::thought::{Thought, Visibility},
|
|
ports::{ActivityPubRepository, OutboundFederationPort, ThoughtRepository, UserRepository},
|
|
value_objects::ThoughtId,
|
|
};
|
|
use std::sync::Arc;
|
|
|
|
pub struct FederationEventService {
|
|
pub thoughts: Arc<dyn ThoughtRepository>,
|
|
pub users: Arc<dyn UserRepository>,
|
|
pub ap: Arc<dyn OutboundFederationPort>,
|
|
pub base_url: String,
|
|
pub federation_action: Arc<dyn domain::ports::FederationActionPort>,
|
|
pub ap_repo: Arc<dyn ActivityPubRepository>,
|
|
pub remote_actor_connections: Arc<dyn domain::ports::RemoteActorConnectionRepository>,
|
|
}
|
|
|
|
impl FederationEventService {
|
|
fn object_ap_id(&self, thought: &Thought, thought_id: &ThoughtId) -> String {
|
|
thought
|
|
.ap_id
|
|
.clone()
|
|
.unwrap_or_else(|| format!("{}/thoughts/{}", self.base_url, thought_id))
|
|
}
|
|
|
|
pub async fn process(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
|
match event {
|
|
DomainEvent::ThoughtCreated {
|
|
thought_id,
|
|
user_id,
|
|
..
|
|
} => {
|
|
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
|
Some(t)
|
|
if t.local
|
|
&& matches!(
|
|
t.visibility,
|
|
Visibility::Public | Visibility::Unlisted
|
|
) =>
|
|
{
|
|
t
|
|
}
|
|
_ => return Ok(()),
|
|
};
|
|
let user = match self.users.find_by_id(user_id).await? {
|
|
Some(u) => u,
|
|
None => return Ok(()),
|
|
};
|
|
self.ap
|
|
.broadcast_create(user_id, &thought, user.username.as_str())
|
|
.await
|
|
}
|
|
|
|
DomainEvent::ThoughtDeleted {
|
|
thought_id,
|
|
user_id,
|
|
} => {
|
|
// No DB lookup — thought is already deleted when this event fires.
|
|
// No locality guard: delete commands only reach local thoughts via the use case.
|
|
let ap_id = format!("{}/thoughts/{}", self.base_url, thought_id);
|
|
self.ap.broadcast_delete(user_id, &ap_id).await
|
|
}
|
|
|
|
DomainEvent::ThoughtUpdated {
|
|
thought_id,
|
|
user_id,
|
|
} => {
|
|
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
|
Some(t)
|
|
if t.local
|
|
&& matches!(
|
|
t.visibility,
|
|
Visibility::Public | Visibility::Unlisted
|
|
) =>
|
|
{
|
|
t
|
|
}
|
|
_ => return Ok(()),
|
|
};
|
|
let user = match self.users.find_by_id(user_id).await? {
|
|
Some(u) => u,
|
|
None => return Ok(()),
|
|
};
|
|
self.ap
|
|
.broadcast_update(user_id, &thought, user.username.as_str())
|
|
.await
|
|
}
|
|
|
|
DomainEvent::BoostAdded {
|
|
boost_id: _,
|
|
user_id,
|
|
thought_id,
|
|
} => {
|
|
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
|
Some(t) => t,
|
|
None => return Ok(()),
|
|
};
|
|
let object_ap_id = self.object_ap_id(&thought, thought_id);
|
|
self.ap.broadcast_announce(user_id, &object_ap_id).await
|
|
}
|
|
|
|
DomainEvent::BoostRemoved {
|
|
user_id,
|
|
thought_id,
|
|
} => {
|
|
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
|
Some(t) => t,
|
|
None => return Ok(()),
|
|
};
|
|
let object_ap_id = self.object_ap_id(&thought, thought_id);
|
|
self.ap
|
|
.broadcast_undo_announce(user_id, &object_ap_id)
|
|
.await
|
|
}
|
|
|
|
DomainEvent::FetchRemoteActorPosts {
|
|
actor_ap_url,
|
|
outbox_url,
|
|
} => {
|
|
let notes = match self
|
|
.federation_action
|
|
.fetch_outbox_page(outbox_url, 1)
|
|
.await
|
|
{
|
|
Ok(n) => n,
|
|
Err(e) => {
|
|
tracing::warn!(outbox_url, error = %e, "failed to fetch remote outbox");
|
|
return Ok(());
|
|
}
|
|
};
|
|
|
|
let actor_url = url::Url::parse(actor_ap_url)
|
|
.map_err(|e| DomainError::ExternalService(e.to_string()))?;
|
|
|
|
let author_id = self.ap_repo.intern_remote_actor(&actor_url).await?;
|
|
|
|
// Resolve and cache display info so thought cards show proper names.
|
|
let profiles = self
|
|
.federation_action
|
|
.resolve_actor_profiles(vec![actor_ap_url.clone()])
|
|
.await;
|
|
if let Some(profile) = profiles.into_iter().next() {
|
|
let _ = self
|
|
.ap_repo
|
|
.update_remote_actor_display(
|
|
&author_id,
|
|
profile.display_name.as_deref(),
|
|
profile.avatar_url.as_deref(),
|
|
)
|
|
.await;
|
|
}
|
|
|
|
for note in notes {
|
|
let ap_id = match url::Url::parse(¬e.ap_id) {
|
|
Ok(u) => u,
|
|
Err(_) => continue,
|
|
};
|
|
let _ = self
|
|
.ap_repo
|
|
.accept_note(
|
|
&ap_id,
|
|
&author_id,
|
|
¬e.content,
|
|
note.published,
|
|
note.sensitive,
|
|
note.content_warning,
|
|
"public",
|
|
)
|
|
.await;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
DomainEvent::FetchActorConnections {
|
|
actor_ap_url,
|
|
collection_url,
|
|
connection_type,
|
|
page,
|
|
} => {
|
|
let urls = match self
|
|
.federation_action
|
|
.fetch_actor_urls_from_collection(collection_url)
|
|
.await
|
|
{
|
|
Ok(u) => u,
|
|
Err(e) => {
|
|
tracing::warn!(
|
|
collection_url,
|
|
error = %e,
|
|
"failed to fetch actor connections collection"
|
|
);
|
|
return Ok(());
|
|
}
|
|
};
|
|
|
|
if urls.is_empty() {
|
|
return Ok(());
|
|
}
|
|
|
|
let summaries = self.federation_action.resolve_actor_profiles(urls).await;
|
|
|
|
if summaries.is_empty() {
|
|
return Ok(());
|
|
}
|
|
|
|
tracing::info!(
|
|
count = summaries.len(),
|
|
connection_type,
|
|
actor = actor_ap_url,
|
|
"caching actor connections"
|
|
);
|
|
|
|
self.remote_actor_connections
|
|
.upsert_connections(actor_ap_url, connection_type, *page, &summaries)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
_ => Ok(()),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use async_trait::async_trait;
|
|
use domain::{
|
|
errors::DomainError,
|
|
events::DomainEvent,
|
|
models::thought::{Thought, Visibility},
|
|
models::user::User,
|
|
ports::{ActivityPubRepository, OutboundFederationPort},
|
|
testing::TestStore,
|
|
value_objects::*,
|
|
};
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
// ── Spy port ─────────────────────────────────────────────────────────────
|
|
|
|
#[derive(Default)]
|
|
struct SpyPort {
|
|
created: Mutex<Vec<ThoughtId>>,
|
|
deleted: Mutex<Vec<String>>,
|
|
updated: Mutex<Vec<ThoughtId>>,
|
|
announced: Mutex<Vec<String>>,
|
|
undo_announced: Mutex<Vec<String>>,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl OutboundFederationPort for SpyPort {
|
|
async fn broadcast_create(
|
|
&self,
|
|
_: &UserId,
|
|
thought: &Thought,
|
|
_: &str,
|
|
) -> Result<(), DomainError> {
|
|
self.created.lock().unwrap().push(thought.id.clone());
|
|
Ok(())
|
|
}
|
|
async fn broadcast_delete(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> {
|
|
self.deleted.lock().unwrap().push(ap_id.to_string());
|
|
Ok(())
|
|
}
|
|
async fn broadcast_update(
|
|
&self,
|
|
_: &UserId,
|
|
thought: &Thought,
|
|
_: &str,
|
|
) -> Result<(), DomainError> {
|
|
self.updated.lock().unwrap().push(thought.id.clone());
|
|
Ok(())
|
|
}
|
|
async fn broadcast_announce(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> {
|
|
self.announced.lock().unwrap().push(ap_id.to_string());
|
|
Ok(())
|
|
}
|
|
async fn broadcast_undo_announce(
|
|
&self,
|
|
_: &UserId,
|
|
ap_id: &str,
|
|
) -> Result<(), DomainError> {
|
|
self.undo_announced.lock().unwrap().push(ap_id.to_string());
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn alice() -> User {
|
|
User::new_local(
|
|
UserId::new(),
|
|
Username::new("alice").unwrap(),
|
|
Email::new("alice@ex.com").unwrap(),
|
|
PasswordHash("h".into()),
|
|
)
|
|
}
|
|
|
|
fn local_thought(author_id: UserId) -> Thought {
|
|
Thought::new_local(
|
|
ThoughtId::new(),
|
|
author_id,
|
|
Content::new_local("hello").unwrap(),
|
|
None,
|
|
Visibility::Public,
|
|
None,
|
|
false,
|
|
)
|
|
}
|
|
|
|
fn svc(store: &TestStore, spy: Arc<SpyPort>) -> FederationEventService {
|
|
FederationEventService {
|
|
thoughts: Arc::new(store.clone()),
|
|
users: Arc::new(store.clone()),
|
|
ap: spy,
|
|
base_url: "https://example.com".to_string(),
|
|
federation_action: Arc::new(store.clone()),
|
|
ap_repo: Arc::new(store.clone()),
|
|
remote_actor_connections: Arc::new(store.clone()),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thought_created_broadcasts_create() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
let thought = local_thought(alice.id.clone());
|
|
store.users.lock().unwrap().push(alice.clone());
|
|
store.thoughts.lock().unwrap().push(thought.clone());
|
|
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::ThoughtCreated {
|
|
thought_id: thought.id.clone(),
|
|
user_id: alice.id.clone(),
|
|
in_reply_to_id: None,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(spy.created.lock().unwrap().len(), 1);
|
|
assert_eq!(spy.created.lock().unwrap()[0], thought.id);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_thought_created_does_not_broadcast() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
// Remote thought: local = false, ap_id = Some(...)
|
|
let mut thought = local_thought(alice.id.clone());
|
|
thought.local = false;
|
|
thought.ap_id = Some("https://remote.example/notes/1".into());
|
|
store.users.lock().unwrap().push(alice.clone());
|
|
store.thoughts.lock().unwrap().push(thought.clone());
|
|
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::ThoughtCreated {
|
|
thought_id: thought.id.clone(),
|
|
user_id: alice.id.clone(),
|
|
in_reply_to_id: None,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert!(spy.created.lock().unwrap().is_empty());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thought_deleted_broadcasts_delete_with_constructed_ap_id() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
let tid = ThoughtId::new();
|
|
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::ThoughtDeleted {
|
|
thought_id: tid.clone(),
|
|
user_id: alice.id.clone(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let deleted = spy.deleted.lock().unwrap();
|
|
assert_eq!(deleted.len(), 1);
|
|
assert_eq!(deleted[0], format!("https://example.com/thoughts/{}", tid));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thought_updated_broadcasts_update() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
let thought = local_thought(alice.id.clone());
|
|
store.users.lock().unwrap().push(alice.clone());
|
|
store.thoughts.lock().unwrap().push(thought.clone());
|
|
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::ThoughtUpdated {
|
|
thought_id: thought.id.clone(),
|
|
user_id: alice.id.clone(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(spy.updated.lock().unwrap().len(), 1);
|
|
assert_eq!(spy.updated.lock().unwrap()[0], thought.id);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn boost_of_local_thought_announces_constructed_url() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
let thought = local_thought(alice.id.clone()); // ap_id = None
|
|
store.thoughts.lock().unwrap().push(thought.clone());
|
|
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::BoostAdded {
|
|
boost_id: BoostId::new(),
|
|
user_id: alice.id.clone(),
|
|
thought_id: thought.id.clone(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let announced = spy.announced.lock().unwrap();
|
|
assert_eq!(announced.len(), 1);
|
|
assert_eq!(
|
|
announced[0],
|
|
format!("https://example.com/thoughts/{}", thought.id)
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn boost_of_remote_thought_announces_remote_ap_id() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
let mut thought = local_thought(alice.id.clone());
|
|
thought.local = false;
|
|
thought.ap_id = Some("https://mastodon.social/users/bob/statuses/123".into());
|
|
store.thoughts.lock().unwrap().push(thought.clone());
|
|
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::BoostAdded {
|
|
boost_id: BoostId::new(),
|
|
user_id: alice.id.clone(),
|
|
thought_id: thought.id.clone(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let announced = spy.announced.lock().unwrap();
|
|
assert_eq!(
|
|
announced[0],
|
|
"https://mastodon.social/users/bob/statuses/123"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn direct_thought_created_does_not_broadcast() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
let thought = Thought::new_local(
|
|
ThoughtId::new(),
|
|
alice.id.clone(),
|
|
Content::new_local("private").unwrap(),
|
|
None,
|
|
Visibility::Direct,
|
|
None,
|
|
false,
|
|
);
|
|
store.users.lock().unwrap().push(alice.clone());
|
|
store.thoughts.lock().unwrap().push(thought.clone());
|
|
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::ThoughtCreated {
|
|
thought_id: thought.id.clone(),
|
|
user_id: alice.id.clone(),
|
|
in_reply_to_id: None,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert!(spy.created.lock().unwrap().is_empty());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn followers_only_thought_does_not_broadcast_publicly() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
let thought = Thought::new_local(
|
|
ThoughtId::new(),
|
|
alice.id.clone(),
|
|
Content::new_local("for followers").unwrap(),
|
|
None,
|
|
Visibility::Followers,
|
|
None,
|
|
false,
|
|
);
|
|
store.users.lock().unwrap().push(alice.clone());
|
|
store.thoughts.lock().unwrap().push(thought.clone());
|
|
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::ThoughtCreated {
|
|
thought_id: thought.id.clone(),
|
|
user_id: alice.id.clone(),
|
|
in_reply_to_id: None,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert!(spy.created.lock().unwrap().is_empty());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn unrelated_events_are_noop() {
|
|
let store = TestStore::default();
|
|
let spy = Arc::new(SpyPort::default());
|
|
let svc = svc(&store, spy.clone());
|
|
|
|
svc.process(&DomainEvent::UserBlocked {
|
|
blocker_id: UserId::new(),
|
|
blocked_id: UserId::new(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert!(spy.created.lock().unwrap().is_empty());
|
|
assert!(spy.deleted.lock().unwrap().is_empty());
|
|
assert!(spy.updated.lock().unwrap().is_empty());
|
|
assert!(spy.announced.lock().unwrap().is_empty());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thought_created_does_not_broadcast_if_user_missing() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
let thought = local_thought(alice.id.clone());
|
|
// Don't push alice into users — simulates user deleted before handler runs
|
|
store.thoughts.lock().unwrap().push(thought.clone());
|
|
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::ThoughtCreated {
|
|
thought_id: thought.id.clone(),
|
|
user_id: alice.id.clone(),
|
|
in_reply_to_id: None,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert!(spy.created.lock().unwrap().is_empty());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn boost_removed_sends_undo_announce_for_local_thought() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
let thought = local_thought(alice.id.clone()); // ap_id = None → constructed URL
|
|
store.thoughts.lock().unwrap().push(thought.clone());
|
|
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::BoostRemoved {
|
|
user_id: alice.id.clone(),
|
|
thought_id: thought.id.clone(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let undo_announced = spy.undo_announced.lock().unwrap();
|
|
assert_eq!(undo_announced.len(), 1);
|
|
assert_eq!(
|
|
undo_announced[0],
|
|
format!("https://example.com/thoughts/{}", thought.id)
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn boost_removed_sends_undo_announce_for_remote_thought() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
let mut thought = local_thought(alice.id.clone());
|
|
thought.local = false;
|
|
thought.ap_id = Some("https://mastodon.social/users/bob/statuses/456".into());
|
|
store.thoughts.lock().unwrap().push(thought.clone());
|
|
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::BoostRemoved {
|
|
user_id: alice.id.clone(),
|
|
thought_id: thought.id.clone(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let undo_announced = spy.undo_announced.lock().unwrap();
|
|
assert_eq!(undo_announced.len(), 1);
|
|
assert_eq!(
|
|
undo_announced[0],
|
|
"https://mastodon.social/users/bob/statuses/456"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn boost_removed_does_not_broadcast_if_thought_missing() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::BoostRemoved {
|
|
user_id: alice.id.clone(),
|
|
thought_id: ThoughtId::new(), // doesn't exist in store
|
|
})
|
|
.await
|
|
.unwrap();
|
|
assert!(spy.undo_announced.lock().unwrap().is_empty());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thought_updated_does_not_broadcast_if_user_missing() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
let thought = local_thought(alice.id.clone());
|
|
// Don't push alice into users
|
|
store.thoughts.lock().unwrap().push(thought.clone());
|
|
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::ThoughtUpdated {
|
|
thought_id: thought.id.clone(),
|
|
user_id: alice.id.clone(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert!(spy.updated.lock().unwrap().is_empty());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn fetch_remote_actor_posts_is_noop_when_outbox_empty() {
|
|
let store = TestStore::default();
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::FetchRemoteActorPosts {
|
|
actor_ap_url: "https://mastodon.social/users/alice".into(),
|
|
outbox_url: "https://mastodon.social/users/alice/outbox".into(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
// TestStore.fetch_outbox_page returns Ok(vec![]) — no notes, no error
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn fetch_actor_connections_is_noop_when_collection_empty() {
|
|
let store = TestStore::default();
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::FetchActorConnections {
|
|
actor_ap_url: "https://mastodon.social/users/alice".into(),
|
|
collection_url: "https://mastodon.social/users/alice/followers".into(),
|
|
connection_type: "followers".into(),
|
|
page: 1,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
}
|
|
}
|