federation refinement

This commit is contained in:
2026-05-09 13:53:45 +02:00
parent df71748897
commit 470b29c9e1
56 changed files with 1513 additions and 544 deletions

View File

@@ -2,9 +2,9 @@ use activitypub_federation::{
activity_sending::SendActivityTask,
fetch::object_id::ObjectId,
protocol::context::WithContext,
traits::Object,
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use domain::{
errors::DomainError,
events::DomainEvent,
@@ -15,9 +15,9 @@ use url::Url;
use crate::{
activities::CreateActivity,
actors::{actor_url, get_local_actor},
actors::get_local_actor,
federation::ApFederationConfig,
objects::{review_url, ReviewObject},
objects::DbReview,
repository::FollowerStatus,
};
@@ -42,11 +42,9 @@ impl EventHandler for ActivityPubEventHandler {
DomainEvent::ReviewLogged {
review_id,
user_id,
rating,
watched_at,
..
} => self
.on_review_logged(user_id, review_id, rating.value(), *watched_at)
.on_review_logged(user_id, review_id)
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string())),
_ => Ok(()),
@@ -59,47 +57,42 @@ impl ActivityPubEventHandler {
&self,
user_id: &UserId,
review_id: &ReviewId,
rating: u8,
watched_at: chrono::NaiveDateTime,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data();
let followers = data.federation_repo.get_followers(user_id.clone()).await?;
tracing::debug!(user_id = %user_id.value(), count = followers.len(), "AP: got followers for review");
let accepted: Vec<_> = followers
.into_iter()
.filter(|f| f.status == FollowerStatus::Accepted)
.collect();
tracing::debug!(accepted = accepted.len(), "AP: accepted followers");
if accepted.is_empty() {
return Ok(());
}
let review = match data.movie_repo.get_review_by_id(review_id).await? {
Some(r) => r,
None => return Ok(()),
};
let local_actor = get_local_actor(user_id.clone(), &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let review_id_url = review_url(&self.base_url, review_id);
let actor_id = actor_url(&self.base_url, user_id);
let activity_id = Url::parse(&format!(
"{}/activities/{}",
self.base_url,
uuid::Uuid::new_v4()
))?;
let activity_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
let stars = "\u{2B50}".repeat(rating as usize);
let now = DateTime::from_naive_utc_and_offset(watched_at, Utc);
let object = ReviewObject {
kind: "Review".to_string(),
id: review_id_url.into(),
attributed_to: actor_id.into(),
content: format!("{} (movie review)", stars),
published: Utc::now(),
movie_title: "Unknown".to_string(), // TODO: fetch from MovieRepository
rating,
comment: None,
watched_at: now,
let db_review = DbReview {
ap_id: crate::urls::review_url(&self.base_url, review_id),
review,
};
let object = db_review
.into_json(&data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let create = CreateActivity {
id: activity_id,
@@ -111,15 +104,23 @@ impl ActivityPubEventHandler {
let inboxes: Vec<Url> = accepted
.iter()
.filter_map(|f| Url::parse(&f.actor.inbox_url).ok())
.filter_map(|f| {
let url = Url::parse(&f.actor.inbox_url);
if url.is_err() {
tracing::warn!(inbox = %f.actor.inbox_url, "AP: invalid inbox URL, skipping follower");
}
url.ok()
})
.collect();
tracing::debug!(inboxes = inboxes.len(), "AP: delivering to inboxes");
let sends =
SendActivityTask::prepare(&create_with_ctx, &local_actor, inboxes, &data).await?;
for send in sends {
if let Err(e) = send.sign_and_send(&data).await {
tracing::warn!(error = %e, "failed to deliver activity to follower");
}
tracing::debug!(sends = sends.len(), "AP: prepared sends");
let failures = crate::service::send_with_retry(sends, &data).await;
if !failures.is_empty() {
tracing::warn!(count = failures.len(), "some activity deliveries failed permanently");
}
Ok(())