separation of activitypub

This commit is contained in:
2026-05-09 17:23:06 +02:00
parent 69f6587623
commit 8819266cf9
32 changed files with 2005 additions and 436 deletions

View File

@@ -1,37 +1,31 @@
use activitypub_federation::{
activity_sending::SendActivityTask,
fetch::object_id::ObjectId,
protocol::context::WithContext,
traits::Object,
};
use async_trait::async_trait;
use domain::{
errors::DomainError,
events::DomainEvent,
ports::MovieRepository,
value_objects::{ReviewId, UserId},
};
use event_publisher::EventHandler;
use url::Url;
use std::sync::Arc;
use crate::{
activities::CreateActivity,
actors::get_local_actor,
federation::ApFederationConfig,
objects::DbReview,
repository::FollowerStatus,
};
use activitypub_base::ActivityPubService;
use crate::objects::review_to_ap_object;
use crate::urls::{actor_url, review_url};
pub struct ActivityPubEventHandler {
federation_config: ApFederationConfig,
ap_service: Arc<ActivityPubService>,
movie_repo: Arc<dyn MovieRepository>,
base_url: String,
}
impl ActivityPubEventHandler {
pub fn new(federation_config: ApFederationConfig, base_url: String) -> Self {
Self {
federation_config,
base_url,
}
pub fn new(
ap_service: Arc<ActivityPubService>,
movie_repo: Arc<dyn MovieRepository>,
base_url: String,
) -> Self {
Self { ap_service, movie_repo, base_url }
}
}
@@ -39,11 +33,7 @@ impl ActivityPubEventHandler {
impl EventHandler for ActivityPubEventHandler {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
match event {
DomainEvent::ReviewLogged {
review_id,
user_id,
..
} => self
DomainEvent::ReviewLogged { review_id, user_id, .. } => self
.on_review_logged(user_id, review_id)
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string())),
@@ -58,70 +48,29 @@ impl ActivityPubEventHandler {
user_id: &UserId,
review_id: &ReviewId,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data();
let followers = data.federation_repo.get_followers(user_id.clone()).await?;
tracing::debug!(user_id = %user_id.value(), count = followers.len(), "AP: got followers for review");
let accepted: Vec<_> = followers
.into_iter()
.filter(|f| f.status == FollowerStatus::Accepted)
.collect();
tracing::debug!(accepted = accepted.len(), "AP: accepted followers");
if accepted.is_empty() {
return Ok(());
}
let review = match data.movie_repo.get_review_by_id(review_id).await? {
let review = match self.movie_repo.get_review_by_id(review_id).await? {
Some(r) => r,
None => return Ok(()),
};
let local_actor = get_local_actor(user_id.clone(), &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let ap_id = review_url(&self.base_url, review_id);
let actor = actor_url(&self.base_url, user_id.value());
let activity_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
let movie = self.movie_repo.get_movie_by_id(review.movie_id()).await.ok().flatten();
let movie_title = movie.as_ref()
.map(|m| m.title().value().to_string())
.unwrap_or_else(|| "Unknown".to_string());
let release_year = movie.as_ref().map(|m| m.release_year().value()).unwrap_or(0);
let poster_url = movie.as_ref()
.and_then(|m| m.poster_path())
.map(|p| format!("{}/posters/{}", self.base_url, p.value()));
let db_review = DbReview {
ap_id: crate::urls::review_url(&self.base_url, review_id),
review,
};
let object = db_review
.into_json(&data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let obj = review_to_ap_object(&review, ap_id.clone(), actor, movie_title, release_year, poster_url);
let json = serde_json::to_value(obj)?;
let create = CreateActivity {
id: activity_id,
kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()),
object,
};
let create_with_ctx = WithContext::new_default(create);
let inboxes: Vec<Url> = accepted
.iter()
.filter_map(|f| {
let url = Url::parse(&f.actor.inbox_url);
if url.is_err() {
tracing::warn!(inbox = %f.actor.inbox_url, "AP: invalid inbox URL, skipping follower");
}
url.ok()
})
.collect();
tracing::debug!(inboxes = inboxes.len(), "AP: delivering to inboxes");
let sends =
SendActivityTask::prepare(&create_with_ctx, &local_actor, inboxes, &data).await?;
tracing::debug!(sends = sends.len(), "AP: prepared sends");
let failures = crate::service::send_with_retry(sends, &data).await;
if !failures.is_empty() {
tracing::warn!(count = failures.len(), "some activity deliveries failed permanently");
}
self.ap_service
.broadcast_to_followers(user_id.value(), ap_id, json)
.await?;
Ok(())
}

View File

@@ -1,22 +1,17 @@
pub mod activities;
pub mod actor_handler;
pub mod actors;
pub mod data;
pub mod error;
pub mod event_handler;
pub mod federation;
pub mod followers_handler;
pub mod inbox;
pub mod objects;
pub mod outbox;
pub mod repository;
pub mod service;
pub mod remote_review_repository;
pub mod review_handler;
pub mod user_adapter;
pub(crate) mod urls;
pub mod webfinger;
pub use data::FederationData;
pub use error::Error;
// Re-export the generic base types that callers need
pub use activitypub_base::{
ActivityPubService, ApFederationConfig, ApObjectHandler, ApUser, ApUserRepository,
FederationData, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor,
};
pub use event_handler::ActivityPubEventHandler;
pub use federation::ApFederationConfig;
pub use repository::{FederationRepository, Follower, FollowerStatus, RemoteActor};
pub use service::ActivityPubService;
pub use remote_review_repository::RemoteReviewRepository;
pub use review_handler::ReviewObjectHandler;
pub use user_adapter::DomainUserRepoAdapter;

View File

@@ -1,33 +1,22 @@
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
kinds::object::NoteType,
protocol::verification::verify_domains_match,
traits::Object,
};
use activitypub_federation::kinds::object::NoteType;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use url::Url;
use domain::models::{Review, ReviewSource};
use domain::value_objects::ReviewId;
use crate::actors::DbActor;
use crate::data::FederationData;
use crate::error::Error;
use domain::models::Review;
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ReviewObject {
#[serde(rename = "type")]
pub(crate) kind: NoteType,
pub(crate) id: ObjectId<DbReview>,
pub(crate) attributed_to: ObjectId<DbActor>,
pub(crate) id: Url,
pub(crate) attributed_to: Url,
pub(crate) content: String,
pub(crate) published: DateTime<Utc>,
pub(crate) movie_title: String,
#[serde(default)]
pub(crate) release_year: u16, // 0 = unknown; default for old AP messages
pub(crate) release_year: u16,
#[serde(default)]
pub(crate) poster_url: Option<String>,
pub(crate) rating: u8,
@@ -35,115 +24,36 @@ pub struct ReviewObject {
pub(crate) watched_at: DateTime<Utc>,
}
#[derive(Debug, Clone)]
pub struct DbReview {
pub review: Review,
pub ap_id: Url,
}
/// Serialize a local Review into a ReviewObject for AP delivery.
/// Takes movie metadata explicitly since the handler fetches it separately.
pub fn review_to_ap_object(
review: &Review,
ap_id: Url,
actor_url: Url,
movie_title: String,
release_year: u16,
poster_url: Option<String>,
) -> ReviewObject {
let stars: String = "\u{2B50}".repeat(review.rating().value() as usize);
let comment_text = review.comment().map(|c| c.value().to_string());
let year_str = if release_year > 0 { format!(" ({})", release_year) } else { String::new() };
let watched_str = format!("Watched: {}", review.watched_at().format("%b %-d, %Y"));
let content = match &comment_text {
Some(c) => format!("{} {}{}\n{}\n{}", stars, movie_title, year_str, c, watched_str),
None => format!("{} {}{}\n{}", stars, movie_title, year_str, watched_str),
};
#[async_trait::async_trait]
impl Object for DbReview {
type DataType = FederationData;
type Kind = ReviewObject;
type Error = Error;
fn id(&self) -> &Url {
&self.ap_id
}
async fn read_from_id(
_object_id: Url,
_data: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error> {
// Incoming activities provide the full object; no need to dereference local reviews
Ok(None)
}
async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
let r = &self.review;
let ap_id = crate::urls::review_url(&data.base_url, r.id());
let actor_url = crate::urls::actor_url(&data.base_url, r.user_id());
let movie = data.movie_repo.get_movie_by_id(r.movie_id()).await
.ok().flatten();
let movie_title = movie.as_ref()
.map(|m| m.title().value().to_string())
.unwrap_or_else(|| "Unknown".to_string());
let release_year = movie.as_ref()
.map(|m| m.release_year().value())
.unwrap_or(0);
let poster_url = movie.as_ref()
.and_then(|m| m.poster_path())
.map(|p| format!("{}/posters/{}", data.base_url, p.value()));
let stars: String = "\u{2B50}".repeat(r.rating().value() as usize);
let comment_text = r.comment().map(|c| c.value().to_string());
let year_str = if release_year > 0 { format!(" ({})", release_year) } else { String::new() };
let watched_str = format!("Watched: {}", r.watched_at().format("%b %-d, %Y"));
let content = match &comment_text {
Some(c) => format!("{} {}{}\n{}\n{}", stars, movie_title, year_str, c, watched_str),
None => format!("{} {}{}\n{}", stars, movie_title, year_str, watched_str),
};
Ok(ReviewObject {
kind: NoteType::default(),
id: ap_id.into(),
attributed_to: actor_url.into(),
content,
published: DateTime::from_naive_utc_and_offset(*r.created_at(), Utc),
movie_title,
release_year,
poster_url,
rating: r.rating().value(),
comment: comment_text,
watched_at: DateTime::from_naive_utc_and_offset(*r.watched_at(), Utc),
})
}
async fn verify(
json: &Self::Kind,
expected_domain: &Url,
_data: &Data<Self::DataType>,
) -> Result<(), Self::Error> {
verify_domains_match(json.attributed_to.inner(), expected_domain)?;
Ok(())
}
async fn from_json(
json: Self::Kind,
data: &Data<Self::DataType>,
) -> Result<Self, Self::Error> {
let actor_url = json.attributed_to.inner().to_string();
let review_id = ReviewId::generate();
let movie_id_uuid = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, json.movie_title.as_bytes());
let movie_id = domain::value_objects::MovieId::from_uuid(movie_id_uuid);
let user_id_uuid = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, actor_url.as_bytes());
let user_id = domain::value_objects::UserId::from_uuid(user_id_uuid);
let rating = domain::value_objects::Rating::new(json.rating.min(5))
.map_err(|e| Error::bad_request(anyhow::anyhow!("{}", e)))?;
let comment = json
.comment
.map(|c| domain::value_objects::Comment::new(c))
.transpose()
.map_err(|e| Error::bad_request(anyhow::anyhow!("{}", e)))?;
let watched_at = json.watched_at.naive_utc();
let created_at = json.published.naive_utc();
let review = Review::from_persistence(
review_id,
movie_id,
user_id,
rating,
comment,
watched_at,
created_at,
ReviewSource::Remote { actor_url },
);
let ap_id_url = json.id.into_inner();
data.federation_repo.save_remote_review(&review, ap_id_url.as_str(), &json.movie_title, json.release_year, json.poster_url.as_deref()).await?;
Ok(DbReview { review, ap_id: ap_id_url })
ReviewObject {
kind: NoteType::default(),
id: ap_id,
attributed_to: actor_url,
content,
published: DateTime::from_naive_utc_and_offset(*review.created_at(), Utc),
movie_title,
release_year,
poster_url,
rating: review.rating().value(),
comment: comment_text,
watched_at: DateTime::from_naive_utc_and_offset(*review.watched_at(), Utc),
}
}

View File

@@ -0,0 +1,29 @@
use anyhow::Result;
use async_trait::async_trait;
use chrono::NaiveDateTime;
use domain::models::Review;
#[async_trait]
pub trait RemoteReviewRepository: Send + Sync {
async fn save_remote_review(
&self,
review: &Review,
ap_id: &str,
movie_title: &str,
release_year: u16,
poster_url: Option<&str>,
) -> Result<()>;
async fn delete_remote_review(&self, ap_id: &str, actor_url: &str) -> Result<()>;
async fn update_remote_review(
&self,
ap_id: &str,
actor_url: &str,
rating: u8,
comment: Option<&str>,
watched_at: NaiveDateTime,
) -> Result<()>;
async fn delete_by_actor(&self, actor_url: &str) -> Result<()>;
}

View File

@@ -0,0 +1,138 @@
use std::sync::Arc;
use activitypub_base::ApObjectHandler;
use async_trait::async_trait;
use domain::{
models::{Review, ReviewSource},
ports::MovieRepository,
value_objects::{Comment, MovieId, Rating, ReviewId, UserId},
};
use url::Url;
use crate::objects::{review_to_ap_object, ReviewObject};
use crate::remote_review_repository::RemoteReviewRepository;
use crate::urls::{actor_url, review_url};
pub struct ReviewObjectHandler {
pub movie_repo: Arc<dyn MovieRepository>,
pub review_store: Arc<dyn RemoteReviewRepository>,
pub base_url: String,
}
#[async_trait]
impl ApObjectHandler for ReviewObjectHandler {
async fn get_local_objects_for_user(
&self,
user_id: uuid::Uuid,
) -> anyhow::Result<Vec<(Url, serde_json::Value)>> {
let domain_user_id = UserId::from_uuid(user_id);
let history = self.movie_repo.get_user_history(&domain_user_id).await?;
let mut results = Vec::new();
for entry in history {
let review = entry.review();
if !matches!(review.source(), ReviewSource::Local) {
continue;
}
let ap_id = review_url(&self.base_url, review.id());
let actor_url = actor_url(&self.base_url, user_id);
let movie = self.movie_repo.get_movie_by_id(review.movie_id()).await.ok().flatten();
let movie_title = movie.as_ref()
.map(|m| m.title().value().to_string())
.unwrap_or_else(|| "Unknown".to_string());
let release_year = movie.as_ref()
.map(|m| m.release_year().value())
.unwrap_or(0);
let poster_url = movie.as_ref()
.and_then(|m| m.poster_path())
.map(|p| format!("{}/posters/{}", self.base_url, p.value()));
let obj = review_to_ap_object(review, ap_id.clone(), actor_url, movie_title, release_year, poster_url);
let json = serde_json::to_value(obj)?;
results.push((ap_id, json));
}
Ok(results)
}
async fn on_create(
&self,
_ap_id: &Url,
_actor_url: &Url,
object: serde_json::Value,
) -> anyhow::Result<()> {
let obj: ReviewObject = match serde_json::from_value(object) {
Ok(o) => o,
Err(e) => {
tracing::debug!("ignoring unrecognized Create object: {}", e);
return Ok(());
}
};
let actor_url_str = obj.attributed_to.to_string();
let review_id = ReviewId::generate();
let movie_id = MovieId::from_uuid(uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, obj.movie_title.as_bytes()));
let user_id = UserId::from_uuid(uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, actor_url_str.as_bytes()));
let rating = Rating::new(obj.rating.min(5))?;
let comment = obj.comment.map(Comment::new).transpose()?;
let review = Review::from_persistence(
review_id,
movie_id,
user_id,
rating,
comment,
obj.watched_at.naive_utc(),
obj.published.naive_utc(),
ReviewSource::Remote { actor_url: actor_url_str },
);
self.review_store
.save_remote_review(&review, obj.id.as_str(), &obj.movie_title, obj.release_year, obj.poster_url.as_deref())
.await?;
Ok(())
}
async fn on_update(
&self,
ap_id: &Url,
actor_url: &Url,
object: serde_json::Value,
) -> anyhow::Result<()> {
let obj: ReviewObject = match serde_json::from_value(object) {
Ok(o) => o,
Err(_) => {
tracing::debug!(actor = %actor_url, "ignoring non-review Update activity");
return Ok(());
}
};
if obj.attributed_to != *actor_url {
anyhow::bail!("update actor does not match object attributed_to");
}
self.review_store
.update_remote_review(
ap_id.as_str(),
actor_url.as_str(),
obj.rating.min(5),
obj.comment.as_deref(),
obj.watched_at.naive_utc(),
)
.await?;
Ok(())
}
async fn on_delete(&self, ap_id: &Url, actor_url: &Url) -> anyhow::Result<()> {
self.review_store
.delete_remote_review(ap_id.as_str(), actor_url.as_str())
.await
}
async fn on_actor_removed(&self, actor_url: &Url) -> anyhow::Result<()> {
self.review_store.delete_by_actor(actor_url.as_str()).await
}
}

View File

@@ -1,26 +1,9 @@
use url::Url;
use uuid::Uuid;
use domain::value_objects::{UserId, ReviewId};
use crate::error::Error;
/// Extracts a UserId from a URL like `https://example.com/users/{uuid}[/...]`
pub fn extract_user_id_from_url(url: &Url) -> Option<UserId> {
let path = url.path();
path.strip_prefix("/users/")
.and_then(|s| s.split('/').next())
.and_then(|uid_str| Uuid::parse_str(uid_str).ok())
.map(UserId::from_uuid)
}
/// Generates a fresh activity URL: `{base_url}/activities/{uuid}`
pub fn activity_url(base_url: &str) -> Result<Url, Error> {
Url::parse(&format!("{}/activities/{}", base_url, Uuid::new_v4()))
.map_err(|e| Error::bad_request(anyhow::anyhow!(e)))
}
use domain::value_objects::ReviewId;
/// Builds the canonical actor URL: `{base_url}/users/{user_id}`
pub fn actor_url(base_url: &str, user_id: &UserId) -> Url {
Url::parse(&format!("{}/users/{}", base_url, user_id.value()))
pub fn actor_url(base_url: &str, user_id: uuid::Uuid) -> Url {
Url::parse(&format!("{}/users/{}", base_url, user_id))
.expect("base_url is always a valid URL prefix")
}
@@ -29,10 +12,3 @@ pub fn review_url(base_url: &str, review_id: &ReviewId) -> Url {
Url::parse(&format!("{}/reviews/{}", base_url, review_id.value()))
.expect("base_url is always a valid URL prefix")
}
/// Stable Create-activity URL derived from review ID.
/// Deterministic so repeated backfills to different followers don't create duplicate posts.
pub fn create_activity_url(base_url: &str, review_id: &ReviewId) -> Result<Url, Error> {
Url::parse(&format!("{}/activities/create/{}", base_url, review_id.value()))
.map_err(|e| Error::bad_request(anyhow::anyhow!(e)))
}

View File

@@ -0,0 +1,28 @@
use std::sync::Arc;
use activitypub_base::{ApUser, ApUserRepository};
use async_trait::async_trait;
use domain::{ports::UserRepository, value_objects::UserId};
pub struct DomainUserRepoAdapter(pub Arc<dyn UserRepository>);
#[async_trait]
impl ApUserRepository for DomainUserRepoAdapter {
async fn find_by_id(&self, id: uuid::Uuid) -> anyhow::Result<Option<ApUser>> {
let user_id = UserId::from_uuid(id);
Ok(self.0.find_by_id(&user_id).await?.map(|u| ApUser {
id: u.id().value(),
username: u.username().value().to_string(),
}))
}
async fn find_by_username(&self, username: &str) -> anyhow::Result<Option<ApUser>> {
use domain::value_objects::Username;
let uname = Username::new(username.to_string())
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
Ok(self.0.find_by_username(&uname).await?.map(|u| ApUser {
id: u.id().value(),
username: u.username().value().to_string(),
}))
}
}