Compare commits
14 Commits
114d9f9558
...
ed744046f4
| Author | SHA1 | Date | |
|---|---|---|---|
| ed744046f4 | |||
| 931894d77a | |||
| 2485869af6 | |||
| b0b3c6a59b | |||
| eaf079069f | |||
| a37c877172 | |||
| 904916d4c1 | |||
| 057fc29abc | |||
| 1fa8389a69 | |||
| 83e87e644b | |||
| 13282fc88e | |||
| 10605bbf2f | |||
| 2d742bdbe3 | |||
| 925856f6b8 |
@@ -439,6 +439,10 @@ pub struct AnnounceActivity {
|
||||
pub(crate) actor: ObjectId<DbActor>,
|
||||
pub(crate) object: Url,
|
||||
pub(crate) published: Option<chrono::DateTime<chrono::Utc>>,
|
||||
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
||||
pub(crate) to: Vec<String>,
|
||||
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
||||
pub(crate) cc: Vec<String>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -30,6 +30,34 @@ use crate::{
|
||||
webfinger::webfinger_handler,
|
||||
};
|
||||
|
||||
fn thought_note_json(
|
||||
thought: &domain::models::thought::Thought,
|
||||
local_actor: &crate::actors::DbActor,
|
||||
base_url: &str,
|
||||
) -> anyhow::Result<(url::Url, serde_json::Value)> {
|
||||
let ap_id = url::Url::parse(&format!("{}/thoughts/{}", base_url, thought.id))?;
|
||||
let mut note = serde_json::json!({
|
||||
"type": "Note",
|
||||
"id": ap_id.to_string(),
|
||||
"attributedTo": local_actor.ap_id.to_string(),
|
||||
"content": thought.content.as_str(),
|
||||
"published": thought.created_at.to_rfc3339(),
|
||||
"to": [crate::urls::AS_PUBLIC],
|
||||
"cc": [local_actor.followers_url.to_string()],
|
||||
"sensitive": thought.sensitive,
|
||||
});
|
||||
if let Some(ref cw) = thought.content_warning {
|
||||
note["summary"] = serde_json::json!(cw);
|
||||
}
|
||||
if let Some(ref reply_url) = thought.in_reply_to_url {
|
||||
note["inReplyTo"] = serde_json::json!(reply_url);
|
||||
}
|
||||
if let Some(updated_at) = thought.updated_at {
|
||||
note["updated"] = serde_json::json!(updated_at.to_rfc3339());
|
||||
}
|
||||
Ok((ap_id, note))
|
||||
}
|
||||
|
||||
fn collect_inboxes(followers: &[crate::repository::Follower]) -> Vec<Url> {
|
||||
let mut seen = std::collections::HashSet::new();
|
||||
let mut inboxes = Vec::new();
|
||||
@@ -113,6 +141,45 @@ impl ActivityPubService {
|
||||
self.federation_config.to_request_data()
|
||||
}
|
||||
|
||||
/// Returns `(local_actor, deduplicated_inboxes)` for all accepted followers,
|
||||
/// excluding blocked actors and blocked domains.
|
||||
/// Returns `None` if there are no eligible followers.
|
||||
async fn accepted_follower_inboxes(
|
||||
&self,
|
||||
data: &activitypub_federation::config::Data<FederationData>,
|
||||
local_user_id: uuid::Uuid,
|
||||
) -> anyhow::Result<Option<(crate::actors::DbActor, Vec<Url>)>> {
|
||||
let local_actor = get_local_actor(local_user_id, data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let followers = data.federation_repo.get_followers(local_user_id).await?;
|
||||
let blocked = data.federation_repo.get_blocked_actors(local_user_id).await.unwrap_or_default();
|
||||
let blocked_set: std::collections::HashSet<String> = blocked.into_iter().collect();
|
||||
let blocked_domains = data.federation_repo.get_blocked_domains().await.unwrap_or_default();
|
||||
let blocked_domain_set: std::collections::HashSet<String> =
|
||||
blocked_domains.into_iter().map(|d| d.domain).collect();
|
||||
|
||||
let accepted: Vec<_> = followers
|
||||
.into_iter()
|
||||
.filter(|f| f.status == FollowerStatus::Accepted)
|
||||
.filter(|f| !blocked_set.contains(&f.actor.url))
|
||||
.filter(|f| {
|
||||
let domain = url::Url::parse(&f.actor.inbox_url)
|
||||
.ok()
|
||||
.and_then(|u| u.host_str().map(|s| s.to_string()))
|
||||
.unwrap_or_default();
|
||||
!blocked_domain_set.contains(&domain)
|
||||
})
|
||||
.collect();
|
||||
|
||||
if accepted.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(Some((local_actor, collect_inboxes(&accepted))))
|
||||
}
|
||||
|
||||
pub async fn actor_json(&self, user_id_str: &str) -> anyhow::Result<String> {
|
||||
use activitypub_federation::traits::Object;
|
||||
let uuid = uuid::Uuid::parse_str(user_id_str)?;
|
||||
@@ -140,6 +207,102 @@ impl ActivityPubService {
|
||||
.layer(self.federation_config.middleware())
|
||||
}
|
||||
|
||||
/// Fan out an Announce activity to all accepted followers.
|
||||
pub async fn broadcast_announce_to_followers(
|
||||
&self,
|
||||
local_user_id: uuid::Uuid,
|
||||
object_ap_id: url::Url,
|
||||
) -> anyhow::Result<()> {
|
||||
// Deterministic ID so Undo(Announce) can reference this same activity.
|
||||
let announce_id = url::Url::parse(&format!(
|
||||
"{}/activities/announce/{}",
|
||||
self.base_url,
|
||||
uuid::Uuid::new_v5(
|
||||
&uuid::Uuid::NAMESPACE_URL,
|
||||
format!("{}/{}", local_user_id, object_ap_id).as_bytes(),
|
||||
)
|
||||
))
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let data = self.federation_config.to_request_data();
|
||||
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let announce = crate::activities::AnnounceActivity {
|
||||
id: announce_id,
|
||||
kind: Default::default(),
|
||||
actor: activitypub_federation::fetch::object_id::ObjectId::from(local_actor.ap_id.clone()),
|
||||
object: object_ap_id,
|
||||
published: Some(chrono::Utc::now()),
|
||||
to: vec![crate::urls::AS_PUBLIC.to_string()],
|
||||
cc: vec![local_actor.followers_url.to_string()],
|
||||
};
|
||||
|
||||
let sends = activitypub_federation::activity_sending::SendActivityTask::prepare(
|
||||
&activitypub_federation::protocol::context::WithContext::new_default(announce),
|
||||
&local_actor,
|
||||
inboxes,
|
||||
&data,
|
||||
)
|
||||
.await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(count = failures.len(), "some Announce deliveries failed");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fan out an Undo(Announce) activity to all accepted followers.
|
||||
pub async fn broadcast_undo_announce_to_followers(
|
||||
&self,
|
||||
local_user_id: uuid::Uuid,
|
||||
object_ap_id: url::Url,
|
||||
) -> anyhow::Result<()> {
|
||||
// Reconstruct the same deterministic announce ID used when the boost was sent.
|
||||
let announce_id = url::Url::parse(&format!(
|
||||
"{}/activities/announce/{}",
|
||||
self.base_url,
|
||||
uuid::Uuid::new_v5(
|
||||
&uuid::Uuid::NAMESPACE_URL,
|
||||
format!("{}/{}", local_user_id, object_ap_id).as_bytes(),
|
||||
)
|
||||
))
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let undo_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let data = self.federation_config.to_request_data();
|
||||
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let undo = crate::activities::UndoActivity {
|
||||
id: undo_id,
|
||||
kind: Default::default(),
|
||||
actor: activitypub_federation::fetch::object_id::ObjectId::from(local_actor.ap_id.clone()),
|
||||
object: serde_json::json!({
|
||||
"type": "Announce",
|
||||
"id": announce_id.to_string(),
|
||||
"actor": local_actor.ap_id.to_string(),
|
||||
"object": object_ap_id.to_string(),
|
||||
}),
|
||||
};
|
||||
|
||||
let sends = activitypub_federation::activity_sending::SendActivityTask::prepare(
|
||||
&activitypub_federation::protocol::context::WithContext::new_default(undo),
|
||||
&local_actor,
|
||||
inboxes,
|
||||
&data,
|
||||
)
|
||||
.await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(count = failures.len(), "some Undo(Announce) deliveries failed");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn follow(&self, local_user_id: uuid::Uuid, handle: &str) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
|
||||
@@ -450,75 +613,6 @@ impl ActivityPubService {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Broadcast a single object to all accepted followers as a Create activity.
|
||||
/// Called by project-specific event handlers when new content is created.
|
||||
pub async fn broadcast_to_followers(
|
||||
&self,
|
||||
local_user_id: uuid::Uuid,
|
||||
ap_id: Url,
|
||||
object: serde_json::Value,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let local_actor = get_local_actor(local_user_id, &data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let followers = data.federation_repo.get_followers(local_user_id).await?;
|
||||
let blocked = data
|
||||
.federation_repo
|
||||
.get_blocked_actors(local_user_id)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let blocked_set: std::collections::HashSet<String> = blocked.into_iter().collect();
|
||||
let blocked_domains = data
|
||||
.federation_repo
|
||||
.get_blocked_domains()
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let blocked_domain_set: std::collections::HashSet<String> =
|
||||
blocked_domains.into_iter().map(|d| d.domain).collect();
|
||||
let accepted: Vec<_> = followers
|
||||
.into_iter()
|
||||
.filter(|f| f.status == FollowerStatus::Accepted)
|
||||
.filter(|f| !blocked_set.contains(&f.actor.url))
|
||||
.filter(|f| {
|
||||
let domain = url::Url::parse(&f.actor.inbox_url)
|
||||
.ok()
|
||||
.and_then(|u| u.host_str().map(|s| s.to_string()))
|
||||
.unwrap_or_default();
|
||||
!blocked_domain_set.contains(&domain)
|
||||
})
|
||||
.collect();
|
||||
|
||||
if accepted.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let create = CreateActivity {
|
||||
id: ap_id.clone(),
|
||||
kind: Default::default(),
|
||||
actor: ObjectId::from(local_actor.ap_id.clone()),
|
||||
object,
|
||||
to: vec![crate::urls::AS_PUBLIC.to_string()],
|
||||
cc: vec![local_actor.followers_url.to_string()],
|
||||
};
|
||||
let create_with_ctx = WithContext::new_default(create);
|
||||
|
||||
let inboxes = collect_inboxes(&accepted);
|
||||
|
||||
let sends =
|
||||
SendActivityTask::prepare(&create_with_ctx, &local_actor, inboxes, &data).await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(
|
||||
count = failures.len(),
|
||||
"some activity deliveries failed permanently"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Broadcast a Delete activity to all accepted followers for a removed review.
|
||||
pub async fn broadcast_delete_to_followers(
|
||||
&self,
|
||||
@@ -526,40 +620,9 @@ impl ActivityPubService {
|
||||
ap_id: Url,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let local_actor = get_local_actor(local_user_id, &data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let followers = data.federation_repo.get_followers(local_user_id).await?;
|
||||
let blocked = data
|
||||
.federation_repo
|
||||
.get_blocked_actors(local_user_id)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let blocked_set: std::collections::HashSet<String> = blocked.into_iter().collect();
|
||||
let blocked_domains = data
|
||||
.federation_repo
|
||||
.get_blocked_domains()
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let blocked_domain_set: std::collections::HashSet<String> =
|
||||
blocked_domains.into_iter().map(|d| d.domain).collect();
|
||||
let accepted: Vec<_> = followers
|
||||
.into_iter()
|
||||
.filter(|f| f.status == FollowerStatus::Accepted)
|
||||
.filter(|f| !blocked_set.contains(&f.actor.url))
|
||||
.filter(|f| {
|
||||
let domain = url::Url::parse(&f.actor.inbox_url)
|
||||
.ok()
|
||||
.and_then(|u| u.host_str().map(|s| s.to_string()))
|
||||
.unwrap_or_default();
|
||||
!blocked_domain_set.contains(&domain)
|
||||
})
|
||||
.collect();
|
||||
|
||||
if accepted.is_empty() {
|
||||
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let delete_id =
|
||||
crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
@@ -572,7 +635,6 @@ impl ActivityPubService {
|
||||
cc: vec![local_actor.followers_url.to_string()],
|
||||
};
|
||||
let delete_with_ctx = WithContext::new_default(delete);
|
||||
let inboxes = collect_inboxes(&accepted);
|
||||
let sends =
|
||||
SendActivityTask::prepare(&delete_with_ctx, &local_actor, inboxes, &data).await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
@@ -593,40 +655,9 @@ impl ActivityPubService {
|
||||
object: serde_json::Value,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let local_actor = get_local_actor(local_user_id, &data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let followers = data.federation_repo.get_followers(local_user_id).await?;
|
||||
let blocked = data
|
||||
.federation_repo
|
||||
.get_blocked_actors(local_user_id)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let blocked_set: std::collections::HashSet<String> = blocked.into_iter().collect();
|
||||
let blocked_domains = data
|
||||
.federation_repo
|
||||
.get_blocked_domains()
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let blocked_domain_set: std::collections::HashSet<String> =
|
||||
blocked_domains.into_iter().map(|d| d.domain).collect();
|
||||
let accepted: Vec<_> = followers
|
||||
.into_iter()
|
||||
.filter(|f| f.status == FollowerStatus::Accepted)
|
||||
.filter(|f| !blocked_set.contains(&f.actor.url))
|
||||
.filter(|f| {
|
||||
let domain = url::Url::parse(&f.actor.inbox_url)
|
||||
.ok()
|
||||
.and_then(|u| u.host_str().map(|s| s.to_string()))
|
||||
.unwrap_or_default();
|
||||
!blocked_domain_set.contains(&domain)
|
||||
})
|
||||
.collect();
|
||||
|
||||
if accepted.is_empty() {
|
||||
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let add = crate::activities::AddActivity {
|
||||
id: ap_id,
|
||||
@@ -637,7 +668,6 @@ impl ActivityPubService {
|
||||
cc: vec![local_actor.followers_url.to_string()],
|
||||
};
|
||||
let add_with_ctx = WithContext::new_default(add);
|
||||
let inboxes = collect_inboxes(&accepted);
|
||||
let sends = SendActivityTask::prepare(&add_with_ctx, &local_actor, inboxes, &data).await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
@@ -653,40 +683,9 @@ impl ActivityPubService {
|
||||
watchlist_entry_ap_id: Url,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let local_actor = get_local_actor(local_user_id, &data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let followers = data.federation_repo.get_followers(local_user_id).await?;
|
||||
let blocked = data
|
||||
.federation_repo
|
||||
.get_blocked_actors(local_user_id)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let blocked_set: std::collections::HashSet<String> = blocked.into_iter().collect();
|
||||
let blocked_domains = data
|
||||
.federation_repo
|
||||
.get_blocked_domains()
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let blocked_domain_set: std::collections::HashSet<String> =
|
||||
blocked_domains.into_iter().map(|d| d.domain).collect();
|
||||
let accepted: Vec<_> = followers
|
||||
.into_iter()
|
||||
.filter(|f| f.status == FollowerStatus::Accepted)
|
||||
.filter(|f| !blocked_set.contains(&f.actor.url))
|
||||
.filter(|f| {
|
||||
let domain = url::Url::parse(&f.actor.inbox_url)
|
||||
.ok()
|
||||
.and_then(|u| u.host_str().map(|s| s.to_string()))
|
||||
.unwrap_or_default();
|
||||
!blocked_domain_set.contains(&domain)
|
||||
})
|
||||
.collect();
|
||||
|
||||
if accepted.is_empty() {
|
||||
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let undo_id =
|
||||
crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
@@ -701,7 +700,6 @@ impl ActivityPubService {
|
||||
}),
|
||||
};
|
||||
let undo_with_ctx = WithContext::new_default(undo);
|
||||
let inboxes = collect_inboxes(&accepted);
|
||||
let sends = SendActivityTask::prepare(&undo_with_ctx, &local_actor, inboxes, &data).await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
@@ -710,75 +708,6 @@ impl ActivityPubService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Broadcast an Update(Note) activity to all accepted followers for an edited review.
|
||||
pub async fn broadcast_update_to_followers(
|
||||
&self,
|
||||
local_user_id: uuid::Uuid,
|
||||
object: serde_json::Value,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let local_actor = get_local_actor(local_user_id, &data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let followers = data.federation_repo.get_followers(local_user_id).await?;
|
||||
let blocked = data
|
||||
.federation_repo
|
||||
.get_blocked_actors(local_user_id)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let blocked_set: std::collections::HashSet<String> = blocked.into_iter().collect();
|
||||
let blocked_domains = data
|
||||
.federation_repo
|
||||
.get_blocked_domains()
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let blocked_domain_set: std::collections::HashSet<String> =
|
||||
blocked_domains.into_iter().map(|d| d.domain).collect();
|
||||
let accepted: Vec<_> = followers
|
||||
.into_iter()
|
||||
.filter(|f| f.status == FollowerStatus::Accepted)
|
||||
.filter(|f| !blocked_set.contains(&f.actor.url))
|
||||
.filter(|f| {
|
||||
let domain = url::Url::parse(&f.actor.inbox_url)
|
||||
.ok()
|
||||
.and_then(|u| u.host_str().map(|s| s.to_string()))
|
||||
.unwrap_or_default();
|
||||
!blocked_domain_set.contains(&domain)
|
||||
})
|
||||
.collect();
|
||||
|
||||
if accepted.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let update_id = Url::parse(&format!(
|
||||
"{}/activities/update/{}",
|
||||
self.base_url,
|
||||
uuid::Uuid::new_v4()
|
||||
))?;
|
||||
let update = crate::activities::UpdateActivity {
|
||||
id: update_id,
|
||||
kind: Default::default(),
|
||||
actor: ObjectId::from(local_actor.ap_id.clone()),
|
||||
object,
|
||||
to: vec![crate::urls::AS_PUBLIC.to_string()],
|
||||
cc: vec![local_actor.followers_url.to_string()],
|
||||
};
|
||||
let update_with_ctx = WithContext::new_default(update);
|
||||
let inboxes = collect_inboxes(&accepted);
|
||||
let sends =
|
||||
SendActivityTask::prepare(&update_with_ctx, &local_actor, inboxes, &data).await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(
|
||||
count = failures.len(),
|
||||
"some update activity deliveries failed"
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn broadcast_actor_update(&self, user_id: uuid::Uuid) -> anyhow::Result<()> {
|
||||
use activitypub_federation::traits::Object;
|
||||
|
||||
@@ -1216,6 +1145,142 @@ impl ActivityPubService {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl domain::ports::OutboundFederationPort for ActivityPubService {
|
||||
// Actor identity (ap_id, followers_url) comes from federation config via get_local_actor.
|
||||
// author_username is provided by the caller but not needed here.
|
||||
async fn broadcast_create(
|
||||
&self,
|
||||
author_user_id: &domain::value_objects::UserId,
|
||||
thought: &domain::models::thought::Thought,
|
||||
_author_username: &str,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
let user_uuid = author_user_id.as_uuid();
|
||||
let data = self.federation_config.to_request_data();
|
||||
let Some((local_actor, inboxes)) = self
|
||||
.accepted_follower_inboxes(&data, user_uuid)
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?
|
||||
else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let (ap_id, note) = thought_note_json(thought, &local_actor, &self.base_url)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
|
||||
let create = crate::activities::CreateActivity {
|
||||
id: ap_id,
|
||||
kind: Default::default(),
|
||||
actor: activitypub_federation::fetch::object_id::ObjectId::from(local_actor.ap_id.clone()),
|
||||
object: note,
|
||||
to: vec![crate::urls::AS_PUBLIC.to_string()],
|
||||
cc: vec![local_actor.followers_url.to_string()],
|
||||
};
|
||||
let sends = activitypub_federation::activity_sending::SendActivityTask::prepare(
|
||||
&activitypub_federation::protocol::context::WithContext::new_default(create),
|
||||
&local_actor,
|
||||
inboxes,
|
||||
&data,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(count = failures.len(), "some Create deliveries failed");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn broadcast_delete(
|
||||
&self,
|
||||
author_user_id: &domain::value_objects::UserId,
|
||||
thought_ap_id: &str,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
let user_uuid = author_user_id.as_uuid();
|
||||
let ap_id = url::Url::parse(thought_ap_id)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
self.broadcast_delete_to_followers(user_uuid, ap_id)
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||
}
|
||||
|
||||
// Actor identity (ap_id, followers_url) comes from federation config via get_local_actor.
|
||||
// author_username is provided by the caller but not needed here.
|
||||
async fn broadcast_update(
|
||||
&self,
|
||||
author_user_id: &domain::value_objects::UserId,
|
||||
thought: &domain::models::thought::Thought,
|
||||
_author_username: &str,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
let user_uuid = author_user_id.as_uuid();
|
||||
let data = self.federation_config.to_request_data();
|
||||
let Some((local_actor, inboxes)) = self
|
||||
.accepted_follower_inboxes(&data, user_uuid)
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?
|
||||
else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let (_ap_id, note) = thought_note_json(thought, &local_actor, &self.base_url)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
|
||||
let update_id = url::Url::parse(&format!(
|
||||
"{}/activities/update/{}",
|
||||
self.base_url,
|
||||
uuid::Uuid::new_v4()
|
||||
))
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
let update = crate::activities::UpdateActivity {
|
||||
id: update_id,
|
||||
kind: Default::default(),
|
||||
actor: activitypub_federation::fetch::object_id::ObjectId::from(local_actor.ap_id.clone()),
|
||||
object: note,
|
||||
to: vec![crate::urls::AS_PUBLIC.to_string()],
|
||||
cc: vec![local_actor.followers_url.to_string()],
|
||||
};
|
||||
let sends = activitypub_federation::activity_sending::SendActivityTask::prepare(
|
||||
&activitypub_federation::protocol::context::WithContext::new_default(update),
|
||||
&local_actor,
|
||||
inboxes,
|
||||
&data,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(count = failures.len(), "some Update deliveries failed");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn broadcast_announce(
|
||||
&self,
|
||||
booster_user_id: &domain::value_objects::UserId,
|
||||
object_ap_id: &str,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
let user_uuid = booster_user_id.as_uuid();
|
||||
let ap_id = url::Url::parse(object_ap_id)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
self.broadcast_announce_to_followers(user_uuid, ap_id)
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||
}
|
||||
|
||||
async fn broadcast_undo_announce(
|
||||
&self,
|
||||
booster_user_id: &domain::value_objects::UserId,
|
||||
object_ap_id: &str,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
let user_uuid = booster_user_id.as_uuid();
|
||||
let ap_id = url::Url::parse(object_ap_id)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
self.broadcast_undo_announce_to_followers(user_uuid, ap_id)
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "tests/service.rs"]
|
||||
mod tests;
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
pub mod services;
|
||||
pub mod use_cases;
|
||||
|
||||
451
crates/application/src/services/federation_event.rs
Normal file
451
crates/application/src/services/federation_event.rs
Normal file
@@ -0,0 +1,451 @@
|
||||
use std::sync::Arc;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
models::thought::Visibility,
|
||||
ports::{OutboundFederationPort, ThoughtRepository, UserRepository},
|
||||
};
|
||||
|
||||
pub struct FederationEventService {
|
||||
pub thoughts: Arc<dyn ThoughtRepository>,
|
||||
pub users: Arc<dyn UserRepository>,
|
||||
pub ap: Arc<dyn OutboundFederationPort>,
|
||||
pub base_url: String,
|
||||
}
|
||||
|
||||
impl FederationEventService {
|
||||
pub async fn process(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
match event {
|
||||
DomainEvent::ThoughtCreated { thought_id, user_id, .. } => {
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) if t.local && matches!(t.visibility, Visibility::Public | Visibility::Unlisted) => t,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let user = match self.users.find_by_id(user_id).await? {
|
||||
Some(u) => u,
|
||||
None => return Ok(()),
|
||||
};
|
||||
self.ap.broadcast_create(user_id, &thought, user.username.as_str()).await
|
||||
}
|
||||
|
||||
DomainEvent::ThoughtDeleted { thought_id, user_id } => {
|
||||
// No DB lookup — thought is already deleted when this event fires.
|
||||
// No locality guard: delete commands only reach local thoughts via the use case.
|
||||
let ap_id = format!("{}/thoughts/{}", self.base_url, thought_id);
|
||||
self.ap.broadcast_delete(user_id, &ap_id).await
|
||||
}
|
||||
|
||||
DomainEvent::ThoughtUpdated { thought_id, user_id } => {
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) if t.local && matches!(t.visibility, Visibility::Public | Visibility::Unlisted) => t,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let user = match self.users.find_by_id(user_id).await? {
|
||||
Some(u) => u,
|
||||
None => return Ok(()),
|
||||
};
|
||||
self.ap.broadcast_update(user_id, &thought, user.username.as_str()).await
|
||||
}
|
||||
|
||||
DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => {
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) => t,
|
||||
None => return Ok(()),
|
||||
};
|
||||
let object_ap_id = thought.ap_id.clone().unwrap_or_else(|| {
|
||||
format!("{}/thoughts/{}", self.base_url, thought_id)
|
||||
});
|
||||
self.ap.broadcast_announce(user_id, &object_ap_id).await
|
||||
}
|
||||
|
||||
DomainEvent::BoostRemoved { user_id, thought_id } => {
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) => t,
|
||||
None => return Ok(()),
|
||||
};
|
||||
let object_ap_id = thought.ap_id.clone().unwrap_or_else(|| {
|
||||
format!("{}/thoughts/{}", self.base_url, thought_id)
|
||||
});
|
||||
self.ap.broadcast_undo_announce(user_id, &object_ap_id).await
|
||||
}
|
||||
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use async_trait::async_trait;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
models::thought::{Thought, Visibility},
|
||||
models::user::User,
|
||||
ports::OutboundFederationPort,
|
||||
testing::TestStore,
|
||||
value_objects::*,
|
||||
};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
// ── Spy port ─────────────────────────────────────────────────────────────
|
||||
|
||||
#[derive(Default)]
|
||||
struct SpyPort {
|
||||
created: Mutex<Vec<ThoughtId>>,
|
||||
deleted: Mutex<Vec<String>>,
|
||||
updated: Mutex<Vec<ThoughtId>>,
|
||||
announced: Mutex<Vec<String>>,
|
||||
undo_announced: Mutex<Vec<String>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl OutboundFederationPort for SpyPort {
|
||||
async fn broadcast_create(&self, _: &UserId, thought: &Thought, _: &str) -> Result<(), DomainError> {
|
||||
self.created.lock().unwrap().push(thought.id.clone());
|
||||
Ok(())
|
||||
}
|
||||
async fn broadcast_delete(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> {
|
||||
self.deleted.lock().unwrap().push(ap_id.to_string());
|
||||
Ok(())
|
||||
}
|
||||
async fn broadcast_update(&self, _: &UserId, thought: &Thought, _: &str) -> Result<(), DomainError> {
|
||||
self.updated.lock().unwrap().push(thought.id.clone());
|
||||
Ok(())
|
||||
}
|
||||
async fn broadcast_announce(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> {
|
||||
self.announced.lock().unwrap().push(ap_id.to_string());
|
||||
Ok(())
|
||||
}
|
||||
async fn broadcast_undo_announce(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> {
|
||||
self.undo_announced.lock().unwrap().push(ap_id.to_string());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn alice() -> User {
|
||||
User::new_local(
|
||||
UserId::new(),
|
||||
Username::new("alice").unwrap(),
|
||||
Email::new("alice@ex.com").unwrap(),
|
||||
PasswordHash("h".into()),
|
||||
)
|
||||
}
|
||||
|
||||
fn local_thought(author_id: UserId) -> Thought {
|
||||
Thought::new_local(
|
||||
ThoughtId::new(), author_id,
|
||||
Content::new_local("hello").unwrap(),
|
||||
None, Visibility::Public, None, false,
|
||||
)
|
||||
}
|
||||
|
||||
fn svc(store: &TestStore, spy: Arc<SpyPort>) -> FederationEventService {
|
||||
FederationEventService {
|
||||
thoughts: Arc::new(store.clone()),
|
||||
users: Arc::new(store.clone()),
|
||||
ap: spy,
|
||||
base_url: "https://example.com".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thought_created_broadcasts_create() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = local_thought(alice.id.clone());
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::ThoughtCreated {
|
||||
thought_id: thought.id.clone(),
|
||||
user_id: alice.id.clone(),
|
||||
in_reply_to_id: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(spy.created.lock().unwrap().len(), 1);
|
||||
assert_eq!(spy.created.lock().unwrap()[0], thought.id);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_thought_created_does_not_broadcast() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
// Remote thought: local = false, ap_id = Some(...)
|
||||
let mut thought = local_thought(alice.id.clone());
|
||||
thought.local = false;
|
||||
thought.ap_id = Some("https://remote.example/notes/1".into());
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::ThoughtCreated {
|
||||
thought_id: thought.id.clone(),
|
||||
user_id: alice.id.clone(),
|
||||
in_reply_to_id: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(spy.created.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thought_deleted_broadcasts_delete_with_constructed_ap_id() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let tid = ThoughtId::new();
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::ThoughtDeleted {
|
||||
thought_id: tid.clone(),
|
||||
user_id: alice.id.clone(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let deleted = spy.deleted.lock().unwrap();
|
||||
assert_eq!(deleted.len(), 1);
|
||||
assert_eq!(deleted[0], format!("https://example.com/thoughts/{}", tid));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thought_updated_broadcasts_update() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = local_thought(alice.id.clone());
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::ThoughtUpdated {
|
||||
thought_id: thought.id.clone(),
|
||||
user_id: alice.id.clone(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(spy.updated.lock().unwrap().len(), 1);
|
||||
assert_eq!(spy.updated.lock().unwrap()[0], thought.id);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn boost_of_local_thought_announces_constructed_url() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = local_thought(alice.id.clone()); // ap_id = None
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::BoostAdded {
|
||||
boost_id: BoostId::new(),
|
||||
user_id: alice.id.clone(),
|
||||
thought_id: thought.id.clone(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let announced = spy.announced.lock().unwrap();
|
||||
assert_eq!(announced.len(), 1);
|
||||
assert_eq!(announced[0], format!("https://example.com/thoughts/{}", thought.id));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn boost_of_remote_thought_announces_remote_ap_id() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let mut thought = local_thought(alice.id.clone());
|
||||
thought.local = false;
|
||||
thought.ap_id = Some("https://mastodon.social/users/bob/statuses/123".into());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::BoostAdded {
|
||||
boost_id: BoostId::new(),
|
||||
user_id: alice.id.clone(),
|
||||
thought_id: thought.id.clone(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let announced = spy.announced.lock().unwrap();
|
||||
assert_eq!(announced[0], "https://mastodon.social/users/bob/statuses/123");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn direct_thought_created_does_not_broadcast() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = Thought::new_local(
|
||||
ThoughtId::new(), alice.id.clone(),
|
||||
Content::new_local("private").unwrap(),
|
||||
None, Visibility::Direct, None, false,
|
||||
);
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::ThoughtCreated {
|
||||
thought_id: thought.id.clone(),
|
||||
user_id: alice.id.clone(),
|
||||
in_reply_to_id: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(spy.created.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn followers_only_thought_does_not_broadcast_publicly() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = Thought::new_local(
|
||||
ThoughtId::new(), alice.id.clone(),
|
||||
Content::new_local("for followers").unwrap(),
|
||||
None, Visibility::Followers, None, false,
|
||||
);
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::ThoughtCreated {
|
||||
thought_id: thought.id.clone(),
|
||||
user_id: alice.id.clone(),
|
||||
in_reply_to_id: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(spy.created.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unrelated_events_are_noop() {
|
||||
let store = TestStore::default();
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
let svc = svc(&store, spy.clone());
|
||||
|
||||
svc.process(&DomainEvent::UserBlocked {
|
||||
blocker_id: UserId::new(),
|
||||
blocked_id: UserId::new(),
|
||||
}).await.unwrap();
|
||||
|
||||
assert!(spy.created.lock().unwrap().is_empty());
|
||||
assert!(spy.deleted.lock().unwrap().is_empty());
|
||||
assert!(spy.updated.lock().unwrap().is_empty());
|
||||
assert!(spy.announced.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thought_created_does_not_broadcast_if_user_missing() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = local_thought(alice.id.clone());
|
||||
// Don't push alice into users — simulates user deleted before handler runs
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::ThoughtCreated {
|
||||
thought_id: thought.id.clone(),
|
||||
user_id: alice.id.clone(),
|
||||
in_reply_to_id: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(spy.created.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn boost_removed_sends_undo_announce_for_local_thought() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = local_thought(alice.id.clone()); // ap_id = None → constructed URL
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::BoostRemoved {
|
||||
user_id: alice.id.clone(),
|
||||
thought_id: thought.id.clone(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let undo_announced = spy.undo_announced.lock().unwrap();
|
||||
assert_eq!(undo_announced.len(), 1);
|
||||
assert_eq!(undo_announced[0], format!("https://example.com/thoughts/{}", thought.id));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn boost_removed_sends_undo_announce_for_remote_thought() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let mut thought = local_thought(alice.id.clone());
|
||||
thought.local = false;
|
||||
thought.ap_id = Some("https://mastodon.social/users/bob/statuses/456".into());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::BoostRemoved {
|
||||
user_id: alice.id.clone(),
|
||||
thought_id: thought.id.clone(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let undo_announced = spy.undo_announced.lock().unwrap();
|
||||
assert_eq!(undo_announced.len(), 1);
|
||||
assert_eq!(undo_announced[0], "https://mastodon.social/users/bob/statuses/456");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn boost_removed_does_not_broadcast_if_thought_missing() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::BoostRemoved {
|
||||
user_id: alice.id.clone(),
|
||||
thought_id: ThoughtId::new(), // doesn't exist in store
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(spy.undo_announced.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thought_updated_does_not_broadcast_if_user_missing() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = local_thought(alice.id.clone());
|
||||
// Don't push alice into users
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
svc(&store, spy.clone())
|
||||
.process(&DomainEvent::ThoughtUpdated {
|
||||
thought_id: thought.id.clone(),
|
||||
user_id: alice.id.clone(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(spy.updated.lock().unwrap().is_empty());
|
||||
}
|
||||
}
|
||||
5
crates/application/src/services/mod.rs
Normal file
5
crates/application/src/services/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
pub mod federation_event;
|
||||
pub mod notification_event;
|
||||
|
||||
pub use federation_event::FederationEventService;
|
||||
pub use notification_event::NotificationEventService;
|
||||
239
crates/application/src/services/notification_event.rs
Normal file
239
crates/application/src/services/notification_event.rs
Normal file
@@ -0,0 +1,239 @@
|
||||
use std::sync::Arc;
|
||||
use chrono::Utc;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
models::notification::{Notification, NotificationType},
|
||||
ports::{NotificationRepository, ThoughtRepository},
|
||||
value_objects::NotificationId,
|
||||
};
|
||||
|
||||
pub struct NotificationEventService {
|
||||
pub thoughts: Arc<dyn ThoughtRepository>,
|
||||
pub notifications: Arc<dyn NotificationRepository>,
|
||||
}
|
||||
|
||||
impl NotificationEventService {
|
||||
pub async fn process(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
match event {
|
||||
DomainEvent::LikeAdded { like_id: _, user_id, thought_id } => {
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) => t,
|
||||
None => return Ok(()),
|
||||
};
|
||||
if thought.user_id == *user_id { return Ok(()); }
|
||||
self.notifications.save(&Notification {
|
||||
id: NotificationId::new(),
|
||||
user_id: thought.user_id,
|
||||
notification_type: NotificationType::Like,
|
||||
from_user_id: Some(user_id.clone()),
|
||||
thought_id: Some(thought_id.clone()),
|
||||
read: false,
|
||||
created_at: Utc::now(),
|
||||
}).await
|
||||
}
|
||||
DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => {
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) => t,
|
||||
None => return Ok(()),
|
||||
};
|
||||
if thought.user_id == *user_id { return Ok(()); }
|
||||
self.notifications.save(&Notification {
|
||||
id: NotificationId::new(),
|
||||
user_id: thought.user_id,
|
||||
notification_type: NotificationType::Boost,
|
||||
from_user_id: Some(user_id.clone()),
|
||||
thought_id: Some(thought_id.clone()),
|
||||
read: false,
|
||||
created_at: Utc::now(),
|
||||
}).await
|
||||
}
|
||||
DomainEvent::FollowAccepted { follower_id, following_id } => {
|
||||
self.notifications.save(&Notification {
|
||||
id: NotificationId::new(),
|
||||
user_id: following_id.clone(),
|
||||
notification_type: NotificationType::Follow,
|
||||
from_user_id: Some(follower_id.clone()),
|
||||
thought_id: None,
|
||||
read: false,
|
||||
created_at: Utc::now(),
|
||||
}).await
|
||||
}
|
||||
DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => {
|
||||
let reply_to_id = match in_reply_to_id {
|
||||
Some(id) => id,
|
||||
None => return Ok(()),
|
||||
};
|
||||
let original = match self.thoughts.find_by_id(reply_to_id).await? {
|
||||
Some(t) => t,
|
||||
None => return Ok(()),
|
||||
};
|
||||
if original.user_id == *user_id { return Ok(()); }
|
||||
self.notifications.save(&Notification {
|
||||
id: NotificationId::new(),
|
||||
user_id: original.user_id,
|
||||
notification_type: NotificationType::Reply,
|
||||
from_user_id: Some(user_id.clone()),
|
||||
thought_id: Some(thought_id.clone()),
|
||||
read: false,
|
||||
created_at: Utc::now(),
|
||||
}).await
|
||||
}
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use domain::{
|
||||
models::{thought::{Thought, Visibility}, user::User},
|
||||
testing::TestStore,
|
||||
value_objects::*,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
|
||||
fn alice() -> User {
|
||||
User::new_local(
|
||||
UserId::new(),
|
||||
Username::new("alice").unwrap(),
|
||||
Email::new("alice@ex.com").unwrap(),
|
||||
PasswordHash("h".into()),
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn like_creates_notification_for_thought_author() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let bob_id = UserId::new();
|
||||
let thought = Thought::new_local(
|
||||
ThoughtId::new(), alice.id.clone(),
|
||||
Content::new_local("hello").unwrap(),
|
||||
None, Visibility::Public, None, false,
|
||||
);
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
let svc = NotificationEventService {
|
||||
thoughts: Arc::new(store.clone()),
|
||||
notifications: Arc::new(store.clone()),
|
||||
};
|
||||
svc.process(&DomainEvent::LikeAdded {
|
||||
like_id: LikeId::new(),
|
||||
user_id: bob_id,
|
||||
thought_id: thought.id.clone(),
|
||||
}).await.unwrap();
|
||||
let notifs = store.notifications.lock().unwrap();
|
||||
assert_eq!(notifs.len(), 1);
|
||||
assert!(matches!(notifs[0].notification_type, NotificationType::Like));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn self_like_creates_no_notification() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = Thought::new_local(
|
||||
ThoughtId::new(), alice.id.clone(),
|
||||
Content::new_local("hello").unwrap(),
|
||||
None, Visibility::Public, None, false,
|
||||
);
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
let svc = NotificationEventService {
|
||||
thoughts: Arc::new(store.clone()),
|
||||
notifications: Arc::new(store.clone()),
|
||||
};
|
||||
svc.process(&DomainEvent::LikeAdded {
|
||||
like_id: LikeId::new(),
|
||||
user_id: alice.id.clone(),
|
||||
thought_id: thought.id.clone(),
|
||||
}).await.unwrap();
|
||||
assert!(store.notifications.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn follow_accepted_creates_notification() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let bob_id = UserId::new();
|
||||
let svc = NotificationEventService {
|
||||
thoughts: Arc::new(store.clone()),
|
||||
notifications: Arc::new(store.clone()),
|
||||
};
|
||||
svc.process(&DomainEvent::FollowAccepted {
|
||||
follower_id: bob_id,
|
||||
following_id: alice.id.clone(),
|
||||
}).await.unwrap();
|
||||
let notifs = store.notifications.lock().unwrap();
|
||||
assert_eq!(notifs.len(), 1);
|
||||
assert!(matches!(notifs[0].notification_type, NotificationType::Follow));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reply_creates_notification_for_original_author() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let bob_id = UserId::new();
|
||||
let original = Thought::new_local(
|
||||
ThoughtId::new(), alice.id.clone(),
|
||||
Content::new_local("original").unwrap(),
|
||||
None, Visibility::Public, None, false,
|
||||
);
|
||||
store.thoughts.lock().unwrap().push(original.clone());
|
||||
let svc = NotificationEventService {
|
||||
thoughts: Arc::new(store.clone()),
|
||||
notifications: Arc::new(store.clone()),
|
||||
};
|
||||
svc.process(&DomainEvent::ThoughtCreated {
|
||||
thought_id: ThoughtId::new(),
|
||||
user_id: bob_id,
|
||||
in_reply_to_id: Some(original.id.clone()),
|
||||
}).await.unwrap();
|
||||
let notifs = store.notifications.lock().unwrap();
|
||||
assert_eq!(notifs.len(), 1);
|
||||
assert!(matches!(notifs[0].notification_type, NotificationType::Reply));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn self_reply_creates_no_notification() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let original = Thought::new_local(
|
||||
ThoughtId::new(), alice.id.clone(),
|
||||
Content::new_local("original").unwrap(),
|
||||
None, Visibility::Public, None, false,
|
||||
);
|
||||
store.thoughts.lock().unwrap().push(original.clone());
|
||||
let svc = NotificationEventService {
|
||||
thoughts: Arc::new(store.clone()),
|
||||
notifications: Arc::new(store.clone()),
|
||||
};
|
||||
svc.process(&DomainEvent::ThoughtCreated {
|
||||
thought_id: ThoughtId::new(),
|
||||
user_id: alice.id.clone(),
|
||||
in_reply_to_id: Some(original.id.clone()),
|
||||
}).await.unwrap();
|
||||
assert!(store.notifications.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn self_boost_creates_no_notification() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = Thought::new_local(
|
||||
ThoughtId::new(), alice.id.clone(),
|
||||
Content::new_local("hello").unwrap(),
|
||||
None, Visibility::Public, None, false,
|
||||
);
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
let svc = NotificationEventService {
|
||||
thoughts: Arc::new(store.clone()),
|
||||
notifications: Arc::new(store.clone()),
|
||||
};
|
||||
svc.process(&DomainEvent::BoostAdded {
|
||||
boost_id: BoostId::new(),
|
||||
user_id: alice.id.clone(),
|
||||
thought_id: thought.id.clone(),
|
||||
}).await.unwrap();
|
||||
assert!(store.notifications.lock().unwrap().is_empty());
|
||||
}
|
||||
}
|
||||
@@ -233,3 +233,45 @@ pub trait ActivityPubRepository: Send + Sync {
|
||||
/// Total locally-authored thought count for NodeInfo responses.
|
||||
async fn count_local_notes(&self) -> Result<u64, DomainError>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait OutboundFederationPort: Send + Sync {
|
||||
/// Fan out a new local Note to all accepted followers.
|
||||
async fn broadcast_create(
|
||||
&self,
|
||||
author_user_id: &UserId,
|
||||
thought: &Thought,
|
||||
author_username: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
|
||||
/// Fan out a Delete tombstone for a now-deleted local Note.
|
||||
/// `thought_ap_id` is pre-constructed by the caller because the thought
|
||||
/// has already been deleted from the DB when this fires.
|
||||
async fn broadcast_delete(
|
||||
&self,
|
||||
author_user_id: &UserId,
|
||||
thought_ap_id: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
|
||||
/// Fan out an Update(Note) for an edited local thought.
|
||||
async fn broadcast_update(
|
||||
&self,
|
||||
author_user_id: &UserId,
|
||||
thought: &Thought,
|
||||
author_username: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
|
||||
/// Fan out an Announce(object_ap_id) for a boost.
|
||||
async fn broadcast_announce(
|
||||
&self,
|
||||
booster_user_id: &UserId,
|
||||
object_ap_id: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
|
||||
/// Fan out an Undo(Announce) to followers when a boost is removed.
|
||||
async fn broadcast_undo_announce(
|
||||
&self,
|
||||
booster_user_id: &UserId,
|
||||
object_ap_id: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
}
|
||||
|
||||
@@ -8,21 +8,21 @@ name = "thoughts-worker"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
domain = { workspace = true }
|
||||
nats = { workspace = true }
|
||||
event-payload = { workspace = true }
|
||||
event-transport = { workspace = true }
|
||||
postgres = { workspace = true }
|
||||
async-nats = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
futures = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
dotenvy = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
sqlx = { workspace = true }
|
||||
domain = { workspace = true }
|
||||
application = { workspace = true }
|
||||
nats = { workspace = true }
|
||||
event-transport = { workspace = true }
|
||||
activitypub-base = { workspace = true }
|
||||
activitypub = { workspace = true }
|
||||
postgres = { workspace = true }
|
||||
postgres-federation = { workspace = true }
|
||||
async-nats = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
futures = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
dotenvy = { workspace = true }
|
||||
sqlx = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
domain = { workspace = true, features = ["test-helpers"] }
|
||||
|
||||
80
crates/worker/src/factory.rs
Normal file
80
crates/worker/src/factory.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
use std::sync::Arc;
|
||||
use sqlx::PgPool;
|
||||
|
||||
use activitypub::ThoughtsObjectHandler;
|
||||
use activitypub_base::ActivityPubService;
|
||||
use application::services::{FederationEventService, NotificationEventService};
|
||||
use postgres::activitypub::PgActivityPubRepository;
|
||||
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
|
||||
|
||||
use crate::handlers::{FederationHandler, NotificationHandler};
|
||||
|
||||
pub struct WorkerHandlers {
|
||||
pub notification: NotificationHandler,
|
||||
pub federation: FederationHandler,
|
||||
}
|
||||
|
||||
pub async fn build(
|
||||
database_url: &str,
|
||||
base_url: &str,
|
||||
nats_url: &str,
|
||||
) -> (
|
||||
event_transport::EventConsumerAdapter<nats::NatsMessageSource>,
|
||||
WorkerHandlers,
|
||||
) {
|
||||
let pool = PgPool::connect(database_url)
|
||||
.await
|
||||
.expect("DB connect failed");
|
||||
|
||||
// Repos
|
||||
let thoughts = Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone()));
|
||||
let users = Arc::new(postgres::user::PgUserRepository::new(pool.clone()));
|
||||
let notifications = Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone()));
|
||||
|
||||
// ActivityPub service (for federation fan-out)
|
||||
let ap_service: Arc<dyn domain::ports::OutboundFederationPort> = Arc::new(
|
||||
ActivityPubService::new(
|
||||
Arc::new(PostgresFederationRepository::new(pool.clone())),
|
||||
Arc::new(PostgresApUserRepository::new(pool.clone(), base_url.to_string())),
|
||||
Arc::new(ThoughtsObjectHandler::new(
|
||||
Arc::new(PgActivityPubRepository::new(pool.clone())),
|
||||
base_url,
|
||||
)),
|
||||
base_url.to_string(),
|
||||
false,
|
||||
"thoughts".to_string(),
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("ActivityPubService build failed"),
|
||||
);
|
||||
|
||||
// Application services
|
||||
let notification_svc = Arc::new(NotificationEventService {
|
||||
thoughts: thoughts.clone(),
|
||||
notifications,
|
||||
});
|
||||
let federation_svc = Arc::new(FederationEventService {
|
||||
thoughts,
|
||||
users,
|
||||
ap: ap_service,
|
||||
base_url: base_url.to_string(),
|
||||
});
|
||||
|
||||
// Thin handlers
|
||||
let handlers = WorkerHandlers {
|
||||
notification: NotificationHandler { service: notification_svc },
|
||||
federation: FederationHandler { service: federation_svc },
|
||||
};
|
||||
|
||||
// NATS consumer
|
||||
let nats_client = async_nats::connect(nats_url)
|
||||
.await
|
||||
.expect("NATS connect failed");
|
||||
let consumer = event_transport::EventConsumerAdapter::new(
|
||||
nats::NatsMessageSource::new(nats_client),
|
||||
);
|
||||
|
||||
(consumer, handlers)
|
||||
}
|
||||
@@ -1,275 +1,23 @@
|
||||
use std::sync::Arc;
|
||||
use chrono::Utc;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
models::notification::{Notification, NotificationType},
|
||||
ports::{NotificationRepository, ThoughtRepository},
|
||||
value_objects::NotificationId,
|
||||
};
|
||||
use application::services::{FederationEventService, NotificationEventService};
|
||||
use domain::{errors::DomainError, events::DomainEvent};
|
||||
|
||||
/// Handles domain events that should create notifications for users.
|
||||
pub struct NotificationHandler {
|
||||
pub thoughts: Arc<dyn ThoughtRepository>,
|
||||
pub notifications: Arc<dyn NotificationRepository>,
|
||||
pub service: Arc<NotificationEventService>,
|
||||
}
|
||||
|
||||
impl NotificationHandler {
|
||||
pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
match event {
|
||||
DomainEvent::LikeAdded { like_id: _, user_id, thought_id } => {
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) => t,
|
||||
None => return Ok(()), // thought deleted — skip
|
||||
};
|
||||
if thought.user_id == *user_id { return Ok(()); } // no self-notifications
|
||||
self.notifications.save(&Notification {
|
||||
id: NotificationId::new(),
|
||||
user_id: thought.user_id,
|
||||
notification_type: NotificationType::Like,
|
||||
from_user_id: Some(user_id.clone()),
|
||||
thought_id: Some(thought_id.clone()),
|
||||
read: false,
|
||||
created_at: Utc::now(),
|
||||
}).await
|
||||
}
|
||||
DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => {
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) => t,
|
||||
None => return Ok(()),
|
||||
};
|
||||
if thought.user_id == *user_id { return Ok(()); }
|
||||
self.notifications.save(&Notification {
|
||||
id: NotificationId::new(),
|
||||
user_id: thought.user_id,
|
||||
notification_type: NotificationType::Boost,
|
||||
from_user_id: Some(user_id.clone()),
|
||||
thought_id: Some(thought_id.clone()),
|
||||
read: false,
|
||||
created_at: Utc::now(),
|
||||
}).await
|
||||
}
|
||||
DomainEvent::FollowAccepted { follower_id, following_id } => {
|
||||
// The person being followed (following_id) gets notified
|
||||
self.notifications.save(&Notification {
|
||||
id: NotificationId::new(),
|
||||
user_id: following_id.clone(),
|
||||
notification_type: NotificationType::Follow,
|
||||
from_user_id: Some(follower_id.clone()),
|
||||
thought_id: None,
|
||||
read: false,
|
||||
created_at: Utc::now(),
|
||||
}).await
|
||||
}
|
||||
DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => {
|
||||
let reply_to_id = match in_reply_to_id {
|
||||
Some(id) => id,
|
||||
None => return Ok(()), // not a reply
|
||||
};
|
||||
let original = match self.thoughts.find_by_id(reply_to_id).await? {
|
||||
Some(t) => t,
|
||||
None => return Ok(()), // original deleted
|
||||
};
|
||||
if original.user_id == *user_id { return Ok(()); } // no self-notifications
|
||||
self.notifications.save(&Notification {
|
||||
id: NotificationId::new(),
|
||||
user_id: original.user_id,
|
||||
notification_type: NotificationType::Reply,
|
||||
from_user_id: Some(user_id.clone()),
|
||||
thought_id: Some(thought_id.clone()),
|
||||
read: false,
|
||||
created_at: Utc::now(),
|
||||
}).await
|
||||
}
|
||||
// All other events: no notification needed in Plan 3
|
||||
_ => Ok(()),
|
||||
}
|
||||
self.service.process(event).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Stub handler for ActivityPub federation — implemented in Plan 4.
|
||||
pub struct FederationHandler;
|
||||
pub struct FederationHandler {
|
||||
pub service: Arc<FederationEventService>,
|
||||
}
|
||||
|
||||
impl FederationHandler {
|
||||
pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
tracing::debug!(?event, "federation handler (stub — Plan 4)");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use domain::{
|
||||
models::{thought::{Thought, Visibility}, user::User},
|
||||
testing::TestStore,
|
||||
value_objects::*,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
|
||||
fn alice() -> User {
|
||||
User::new_local(
|
||||
UserId::new(),
|
||||
Username::new("alice").unwrap(),
|
||||
Email::new("alice@ex.com").unwrap(),
|
||||
PasswordHash("h".into()),
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn like_added_creates_notification_for_thought_author() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let bob_id = UserId::new();
|
||||
|
||||
let thought = Thought::new_local(
|
||||
ThoughtId::new(), alice.id.clone(),
|
||||
Content::new_local("hello").unwrap(),
|
||||
None, Visibility::Public, None, false,
|
||||
);
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let handler = NotificationHandler {
|
||||
thoughts: Arc::new(store.clone()),
|
||||
notifications: Arc::new(store.clone()),
|
||||
};
|
||||
|
||||
handler.handle(&DomainEvent::LikeAdded {
|
||||
like_id: LikeId::new(),
|
||||
user_id: bob_id.clone(),
|
||||
thought_id: thought.id.clone(),
|
||||
}).await.unwrap();
|
||||
|
||||
let notifs = store.notifications.lock().unwrap();
|
||||
assert_eq!(notifs.len(), 1);
|
||||
assert_eq!(notifs[0].user_id, alice.id);
|
||||
assert!(matches!(notifs[0].notification_type, NotificationType::Like));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn self_like_does_not_create_notification() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let thought = Thought::new_local(
|
||||
ThoughtId::new(), alice.id.clone(),
|
||||
Content::new_local("hello").unwrap(),
|
||||
None, Visibility::Public, None, false,
|
||||
);
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let handler = NotificationHandler {
|
||||
thoughts: Arc::new(store.clone()),
|
||||
notifications: Arc::new(store.clone()),
|
||||
};
|
||||
|
||||
handler.handle(&DomainEvent::LikeAdded {
|
||||
like_id: LikeId::new(),
|
||||
user_id: alice.id.clone(),
|
||||
thought_id: thought.id.clone(),
|
||||
}).await.unwrap();
|
||||
|
||||
assert!(store.notifications.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn follow_accepted_creates_notification() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let bob_id = UserId::new();
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
|
||||
let handler = NotificationHandler {
|
||||
thoughts: Arc::new(store.clone()),
|
||||
notifications: Arc::new(store.clone()),
|
||||
};
|
||||
|
||||
handler.handle(&DomainEvent::FollowAccepted {
|
||||
follower_id: bob_id.clone(),
|
||||
following_id: alice.id.clone(),
|
||||
}).await.unwrap();
|
||||
|
||||
let notifs = store.notifications.lock().unwrap();
|
||||
assert_eq!(notifs.len(), 1);
|
||||
assert_eq!(notifs[0].user_id, alice.id);
|
||||
assert!(matches!(notifs[0].notification_type, NotificationType::Follow));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reply_creates_notification_for_original_author() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let bob_id = UserId::new();
|
||||
|
||||
let original = Thought::new_local(
|
||||
ThoughtId::new(), alice.id.clone(),
|
||||
Content::new_local("original thought").unwrap(),
|
||||
None, Visibility::Public, None, false,
|
||||
);
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
store.thoughts.lock().unwrap().push(original.clone());
|
||||
|
||||
let handler = NotificationHandler {
|
||||
thoughts: Arc::new(store.clone()),
|
||||
notifications: Arc::new(store.clone()),
|
||||
};
|
||||
|
||||
handler.handle(&DomainEvent::ThoughtCreated {
|
||||
thought_id: ThoughtId::new(),
|
||||
user_id: bob_id.clone(),
|
||||
in_reply_to_id: Some(original.id.clone()),
|
||||
}).await.unwrap();
|
||||
|
||||
let notifs = store.notifications.lock().unwrap();
|
||||
assert_eq!(notifs.len(), 1);
|
||||
assert_eq!(notifs[0].user_id, alice.id);
|
||||
assert!(matches!(notifs[0].notification_type, NotificationType::Reply));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn self_reply_does_not_create_notification() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
let original = Thought::new_local(
|
||||
ThoughtId::new(), alice.id.clone(),
|
||||
Content::new_local("original").unwrap(),
|
||||
None, Visibility::Public, None, false,
|
||||
);
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
store.thoughts.lock().unwrap().push(original.clone());
|
||||
|
||||
let handler = NotificationHandler {
|
||||
thoughts: Arc::new(store.clone()),
|
||||
notifications: Arc::new(store.clone()),
|
||||
};
|
||||
|
||||
handler.handle(&DomainEvent::ThoughtCreated {
|
||||
thought_id: ThoughtId::new(),
|
||||
user_id: alice.id.clone(),
|
||||
in_reply_to_id: Some(original.id.clone()),
|
||||
}).await.unwrap();
|
||||
|
||||
assert!(store.notifications.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thought_without_reply_to_creates_no_notification() {
|
||||
let store = TestStore::default();
|
||||
let alice = alice();
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
|
||||
let handler = NotificationHandler {
|
||||
thoughts: Arc::new(store.clone()),
|
||||
notifications: Arc::new(store.clone()),
|
||||
};
|
||||
|
||||
handler.handle(&DomainEvent::ThoughtCreated {
|
||||
thought_id: ThoughtId::new(),
|
||||
user_id: alice.id.clone(),
|
||||
in_reply_to_id: None,
|
||||
}).await.unwrap();
|
||||
|
||||
assert!(store.notifications.lock().unwrap().is_empty());
|
||||
self.service.process(event).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
mod factory;
|
||||
mod handlers;
|
||||
|
||||
use std::sync::Arc;
|
||||
use futures::StreamExt;
|
||||
use sqlx::PgPool;
|
||||
use domain::ports::EventConsumer;
|
||||
|
||||
#[tokio::main]
|
||||
@@ -14,22 +13,12 @@ async fn main() {
|
||||
|
||||
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required");
|
||||
let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());
|
||||
let base_url = std::env::var("BASE_URL").expect("BASE_URL required");
|
||||
|
||||
tracing::info!("Connecting to postgres...");
|
||||
let pool = PgPool::connect(&database_url).await.expect("DB connect failed");
|
||||
|
||||
tracing::info!("Connecting to NATS at {nats_url}...");
|
||||
let nats_client = async_nats::connect(&nats_url).await.expect("NATS connect failed");
|
||||
let consumer = event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(nats_client));
|
||||
|
||||
let notification_handler = handlers::NotificationHandler {
|
||||
thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())),
|
||||
notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())),
|
||||
};
|
||||
let federation_handler = handlers::FederationHandler;
|
||||
tracing::info!("Building worker...");
|
||||
let (consumer, handlers) = factory::build(&database_url, &base_url, &nats_url).await;
|
||||
|
||||
tracing::info!("Worker started, consuming events...");
|
||||
|
||||
let mut stream = consumer.consume();
|
||||
while let Some(result) = stream.next().await {
|
||||
match result {
|
||||
@@ -37,20 +26,18 @@ async fn main() {
|
||||
let event = &envelope.event;
|
||||
tracing::debug!(?event, "received event");
|
||||
|
||||
let n_result = notification_handler.handle(event).await;
|
||||
let f_result = federation_handler.handle(event).await;
|
||||
let n = handlers.notification.handle(event).await;
|
||||
let f = handlers.federation.handle(event).await;
|
||||
|
||||
if n_result.is_ok() && f_result.is_ok() {
|
||||
if n.is_ok() && f.is_ok() {
|
||||
(envelope.ack)();
|
||||
} else {
|
||||
if let Err(e) = n_result { tracing::error!("notification handler error: {e}"); }
|
||||
if let Err(e) = f_result { tracing::error!("federation handler error: {e}"); }
|
||||
if let Err(e) = n { tracing::error!("notification handler: {e}"); }
|
||||
if let Err(e) = f { tracing::error!("federation handler: {e}"); }
|
||||
(envelope.nack)();
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("consumer error: {e}");
|
||||
}
|
||||
Err(e) => tracing::error!("consumer error: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user