diff --git a/Cargo.lock b/Cargo.lock index b950628..45d9bb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5,16 +5,32 @@ version = 4 [[package]] name = "activitypub" version = "0.1.0" +dependencies = [ + "activitypub-base", + "activitypub_federation", + "anyhow", + "async-trait", + "chrono", + "domain", + "event-publisher", + "serde", + "serde_json", + "tokio", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "activitypub-base" +version = "0.1.0" dependencies = [ "activitypub_federation", "anyhow", - "application", "async-trait", "axum", "chrono", - "domain", "enum_delegate", - "event-publisher", "reqwest 0.13.3", "serde", "serde_json", @@ -4005,6 +4021,7 @@ name = "sqlite" version = "0.1.0" dependencies = [ "activitypub", + "activitypub-base", "anyhow", "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index dd0631e..8f731d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "crates/adapters/sqlite", "crates/adapters/template-askama", "crates/adapters/activitypub", + "crates/adapters/activitypub-base", "crates/application", "crates/domain", "crates/presentation", @@ -49,3 +50,4 @@ rss = { path = "crates/adapters/rss" } sqlite = { path = "crates/adapters/sqlite" } template-askama = { path = "crates/adapters/template-askama" } activitypub = { path = "crates/adapters/activitypub" } +activitypub-base = { path = "crates/adapters/activitypub-base" } diff --git a/crates/adapters/activitypub-base/Cargo.toml b/crates/adapters/activitypub-base/Cargo.toml new file mode 100644 index 0000000..fcc3694 --- /dev/null +++ b/crates/adapters/activitypub-base/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "activitypub-base" +version = "0.1.0" +edition = "2024" + +[dependencies] +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +reqwest = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +thiserror = { workspace = true } +anyhow = { workspace = true } +tracing = { workspace = true } +async-trait = { workspace = true } + +activitypub_federation = "0.7.0-beta.11" +url = { version = "2", features = ["serde"] } +enum_delegate = "0.2" +axum = "0.8" diff --git a/crates/adapters/activitypub-base/src/activities.rs b/crates/adapters/activitypub-base/src/activities.rs new file mode 100644 index 0000000..fa90f6e --- /dev/null +++ b/crates/adapters/activitypub-base/src/activities.rs @@ -0,0 +1,341 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + kinds::activity::{AcceptType, CreateType, DeleteType, FollowType, RejectType, UndoType, UpdateType}, + traits::Activity, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; +use crate::repository::{FollowerStatus, FollowingStatus}; + +// --- Follow --- + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct FollowActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: FollowType, + pub(crate) actor: ObjectId, + pub(crate) object: ObjectId, +} + +#[async_trait::async_trait] +impl Activity for FollowActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, data: &Data) -> Result<(), Self::Error> { + let target_url = self.object.inner(); + let target_domain = match (target_url.host_str(), target_url.port()) { + (Some(host), Some(port)) => format!("{}:{}", host, port), + (Some(host), None) => host.to_string(), + _ => return Err(Error::bad_request(anyhow::anyhow!("invalid follow target URL"))), + }; + if target_domain != data.domain { + return Err(Error::bad_request(anyhow::anyhow!("follow target is not a local actor"))); + } + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let _follower = self.actor.dereference(data).await?; + let local_actor = self.object.dereference(data).await?; + + data.federation_repo + .add_follower( + local_actor.user_id, + self.actor.inner().as_str(), + FollowerStatus::Pending, + self.id.as_str(), + ) + .await?; + + tracing::info!( + follower = %self.actor.inner(), + local_user = %local_actor.user_id, + "follow request pending approval" + ); + Ok(()) + } +} + +// --- Accept --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AcceptActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: AcceptType, + pub(crate) actor: ObjectId, + pub(crate) object: FollowActivity, +} + +#[async_trait::async_trait] +impl Activity for AcceptActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let local_user_id = crate::urls::extract_user_id_from_url(self.object.actor.inner()) + .ok_or_else(|| Error::bad_request(anyhow::anyhow!("invalid actor URL in Follow")))?; + data.federation_repo + .update_following_status(local_user_id, self.actor.inner().as_str(), FollowingStatus::Accepted) + .await?; + tracing::info!(remote_actor = %self.actor.inner(), "follow accepted by remote"); + Ok(()) + } +} + +// --- Reject --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct RejectActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: RejectType, + pub(crate) actor: ObjectId, + pub(crate) object: FollowActivity, +} + +#[async_trait::async_trait] +impl Activity for RejectActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if let Some(user_id) = crate::urls::extract_user_id_from_url(self.object.actor.inner()) { + data.federation_repo + .remove_following(user_id, self.actor.inner().as_str()) + .await?; + } + tracing::info!(actor = %self.actor.inner(), "follow rejected"); + Ok(()) + } +} + +// --- Undo --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UndoActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: UndoType, + pub(crate) actor: ObjectId, + pub(crate) object: FollowActivity, +} + +#[async_trait::async_trait] +impl Activity for UndoActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + if let Some(user_id) = crate::urls::extract_user_id_from_url(self.object.object.inner()) { + data.federation_repo + .remove_follower(user_id, self.actor.inner().as_str()) + .await?; + } + data.object_handler + .on_actor_removed(self.actor.inner()) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(actor = %self.actor.inner(), "unfollowed"); + Ok(()) + } +} + +// --- Create --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: CreateType, + pub(crate) actor: ObjectId, + pub(crate) object: serde_json::Value, +} + +#[async_trait::async_trait] +impl Activity for CreateActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let ap_id = self.id.clone(); + let actor_url = self.actor.inner().clone(); + data.object_handler + .on_create(&ap_id, &actor_url, self.object) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(actor = %actor_url, "received create activity"); + Ok(()) + } +} + +// --- Delete --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DeleteActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: DeleteType, + pub(crate) actor: ObjectId, + pub(crate) object: Url, +} + +#[async_trait::async_trait] +impl Activity for DeleteActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let actor_url = self.actor.inner().clone(); + data.object_handler + .on_delete(&self.object, &actor_url) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(object = %self.object, "received delete activity"); + Ok(()) + } +} + +// --- Update --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateActivity { + pub(crate) id: Url, + #[serde(rename = "type", default)] + pub(crate) kind: UpdateType, + pub(crate) actor: ObjectId, + pub(crate) object: serde_json::Value, +} + +#[async_trait::async_trait] +impl Activity for UpdateActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let ap_id = self.id.clone(); + let actor_url = self.actor.inner().clone(); + data.object_handler + .on_update(&ap_id, &actor_url, self.object) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + tracing::info!(actor = %actor_url, "received update activity"); + Ok(()) + } +} + +// --- Inbox dispatch enum --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(tag = "type")] +#[enum_delegate::implement(Activity)] +pub enum InboxActivities { + #[serde(rename = "Follow")] + Follow(FollowActivity), + #[serde(rename = "Accept")] + Accept(AcceptActivity), + #[serde(rename = "Reject")] + Reject(RejectActivity), + #[serde(rename = "Undo")] + Undo(UndoActivity), + #[serde(rename = "Create")] + Create(CreateActivity), + #[serde(rename = "Delete")] + Delete(DeleteActivity), + #[serde(rename = "Update")] + Update(UpdateActivity), +} diff --git a/crates/adapters/activitypub-base/src/actor_handler.rs b/crates/adapters/activitypub-base/src/actor_handler.rs new file mode 100644 index 0000000..a702590 --- /dev/null +++ b/crates/adapters/activitypub-base/src/actor_handler.rs @@ -0,0 +1,21 @@ +use activitypub_federation::{ + axum::json::FederationJson, config::Data, protocol::context::WithContext, traits::Object, +}; +use axum::extract::Path; + +use crate::actors::{get_local_actor, Person}; +use crate::data::FederationData; +use crate::error::Error; + +pub async fn actor_handler( + Path(user_id_str): Path, + data: Data, +) -> Result>, Error> { + let uuid = uuid::Uuid::parse_str(&user_id_str) + .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?; + + let db_actor = get_local_actor(uuid, &data).await?; + let person = db_actor.into_json(&data).await?; + + Ok(FederationJson(WithContext::new_default(person))) +} diff --git a/crates/adapters/activitypub-base/src/actors.rs b/crates/adapters/activitypub-base/src/actors.rs new file mode 100644 index 0000000..0d292b2 --- /dev/null +++ b/crates/adapters/activitypub-base/src/actors.rs @@ -0,0 +1,230 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + http_signatures::generate_actor_keypair, + kinds::actor::PersonType, + protocol::{public_key::PublicKey, verification::verify_domains_match}, + traits::{Actor, Object}, +}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::data::FederationData; +use crate::error::Error; +use crate::repository::RemoteActor; + +#[derive(Debug, Clone)] +pub struct DbActor { + pub user_id: uuid::Uuid, + pub username: String, + pub public_key_pem: String, + pub private_key_pem: Option, + pub inbox_url: Url, + pub outbox_url: Url, + pub followers_url: Url, + pub following_url: Url, + pub ap_id: Url, + pub last_refreshed_at: DateTime, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Person { + #[serde(rename = "type")] + kind: PersonType, + id: ObjectId, + preferred_username: String, + inbox: Url, + outbox: Url, + followers: Url, + following: Url, + public_key: PublicKey, + name: Option, +} + +pub async fn get_local_actor( + user_id: uuid::Uuid, + data: &Data, +) -> Result { + let user = data + .user_repo + .find_by_id(user_id) + .await + .map_err(Error::from)? + .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found: {}", user_id)))?; + + let (public_key, private_key) = match data + .federation_repo + .get_local_actor_keypair(user_id) + .await? + { + Some(kp) => kp, + None => { + let kp = generate_actor_keypair()?; + data.federation_repo + .save_local_actor_keypair( + user_id, + kp.public_key.clone(), + kp.private_key.clone(), + ) + .await?; + (kp.public_key, kp.private_key) + } + }; + + let ap_id = crate::urls::actor_url(&data.base_url, user_id); + let inbox_url = Url::parse(&format!("{}/inbox", &ap_id)).expect("valid inbox url"); + let outbox_url = Url::parse(&format!("{}/outbox", &ap_id)).expect("valid outbox url"); + let followers_url = Url::parse(&format!("{}/followers", &ap_id)).expect("valid followers url"); + let following_url = Url::parse(&format!("{}/following", &ap_id)).expect("valid following url"); + + Ok(DbActor { + user_id, + username: user.username, + public_key_pem: public_key, + private_key_pem: Some(private_key), + inbox_url, + outbox_url, + followers_url, + following_url, + ap_id, + last_refreshed_at: Utc::now(), + }) +} + +#[async_trait::async_trait] +impl Object for DbActor { + type DataType = FederationData; + type Kind = Person; + type Error = Error; + + fn id(&self) -> &Url { + &self.ap_id + } + + fn last_refreshed_at(&self) -> Option> { + Some(self.last_refreshed_at) + } + + async fn read_from_id( + object_id: Url, + data: &Data, + ) -> Result, Self::Error> { + let user_id = match crate::urls::extract_user_id_from_url(&object_id) { + Some(id) => id, + None => return Ok(None), + }; + let user = match data.user_repo.find_by_id(user_id).await { + Ok(Some(u)) => u, + _ => return Ok(None), + }; + + let keypair = data + .federation_repo + .get_local_actor_keypair(user_id) + .await?; + + let (public_key, private_key) = match keypair { + Some(kp) => (kp.0, Some(kp.1)), + None => return Ok(None), + }; + + let ap_id = crate::urls::actor_url(&data.base_url, user_id); + let inbox_url = Url::parse(&format!("{}/inbox", &ap_id)).expect("valid url"); + let outbox_url = Url::parse(&format!("{}/outbox", &ap_id)).expect("valid url"); + let followers_url = Url::parse(&format!("{}/followers", &ap_id)).expect("valid url"); + let following_url = Url::parse(&format!("{}/following", &ap_id)).expect("valid url"); + + Ok(Some(DbActor { + user_id, + username: user.username, + public_key_pem: public_key, + private_key_pem: private_key, + inbox_url, + outbox_url, + followers_url, + following_url, + ap_id, + last_refreshed_at: Utc::now(), + })) + } + + async fn into_json(self, _data: &Data) -> Result { + let public_key = PublicKey { + id: format!("{}#main-key", &self.ap_id), + owner: self.ap_id.clone(), + public_key_pem: self.public_key_pem.clone(), + }; + + Ok(Person { + kind: Default::default(), + id: self.ap_id.clone().into(), + preferred_username: self.username.clone(), + inbox: self.inbox_url.clone(), + outbox: self.outbox_url.clone(), + followers: self.followers_url.clone(), + following: self.following_url.clone(), + public_key, + name: Some(self.username.clone()), + }) + } + + async fn verify( + json: &Self::Kind, + expected_domain: &Url, + _data: &Data, + ) -> Result<(), Self::Error> { + verify_domains_match(json.id.inner(), expected_domain)?; + Ok(()) + } + + async fn from_json( + json: Self::Kind, + data: &Data, + ) -> Result { + let actor = RemoteActor { + url: json.id.inner().to_string(), + handle: json.preferred_username.clone(), + inbox_url: json.inbox.to_string(), + shared_inbox_url: None, + display_name: json.name.clone(), + }; + data.federation_repo.upsert_remote_actor(actor).await?; + + let url_str = json.id.inner().to_string(); + let user_id = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, url_str.as_bytes()); + let ap_id = json.id.inner().clone(); + let inbox_url = json.inbox.clone(); + let outbox_url = json.outbox.clone(); + let followers_url = json.followers.clone(); + let following_url = json.following.clone(); + + Ok(DbActor { + user_id, + username: json.preferred_username.clone(), + public_key_pem: json.public_key.public_key_pem, + private_key_pem: None, + inbox_url, + outbox_url, + followers_url, + following_url, + ap_id, + last_refreshed_at: Utc::now(), + }) + } +} + +impl Actor for DbActor { + fn public_key_pem(&self) -> &str { + &self.public_key_pem + } + + fn private_key_pem(&self) -> Option { + self.private_key_pem.clone() + } + + fn inbox(&self) -> Url { + self.inbox_url.clone() + } +} diff --git a/crates/adapters/activitypub-base/src/content.rs b/crates/adapters/activitypub-base/src/content.rs new file mode 100644 index 0000000..69b54b6 --- /dev/null +++ b/crates/adapters/activitypub-base/src/content.rs @@ -0,0 +1,34 @@ +use async_trait::async_trait; +use url::Url; + +#[async_trait] +pub trait ApObjectHandler: Send + Sync { + /// Returns (ap_id, serialized object) for all local content owned by this user. + /// Used by outbox (count) and backfill (delivery). Must only return locally-authored content. + async fn get_local_objects_for_user( + &self, + user_id: uuid::Uuid, + ) -> anyhow::Result>; + + /// Incoming Create activity — persist remote content. + async fn on_create( + &self, + ap_id: &Url, + actor_url: &Url, + object: serde_json::Value, + ) -> anyhow::Result<()>; + + /// Incoming Update activity — update existing remote content. + async fn on_update( + &self, + ap_id: &Url, + actor_url: &Url, + object: serde_json::Value, + ) -> anyhow::Result<()>; + + /// Incoming Delete activity — remove specific remote content. + async fn on_delete(&self, ap_id: &Url, actor_url: &Url) -> anyhow::Result<()>; + + /// Actor unfollowed/was removed — clean up all their remote content. + async fn on_actor_removed(&self, actor_url: &Url) -> anyhow::Result<()>; +} diff --git a/crates/adapters/activitypub-base/src/data.rs b/crates/adapters/activitypub-base/src/data.rs new file mode 100644 index 0000000..cf2793a --- /dev/null +++ b/crates/adapters/activitypub-base/src/data.rs @@ -0,0 +1,38 @@ +use std::sync::Arc; + +use crate::content::ApObjectHandler; +use crate::repository::FederationRepository; +use crate::user::ApUserRepository; + +#[derive(Clone)] +pub struct FederationData { + pub(crate) federation_repo: Arc, + pub(crate) user_repo: Arc, + pub(crate) object_handler: Arc, + pub(crate) base_url: String, + pub(crate) domain: String, +} + +impl FederationData { + pub fn new( + federation_repo: Arc, + user_repo: Arc, + object_handler: Arc, + base_url: String, + ) -> Self { + let domain = base_url + .trim_start_matches("https://") + .trim_start_matches("http://") + .split('/') + .next() + .unwrap_or("") + .to_string(); + Self { + federation_repo, + user_repo, + object_handler, + base_url, + domain, + } + } +} diff --git a/crates/adapters/activitypub-base/src/error.rs b/crates/adapters/activitypub-base/src/error.rs new file mode 100644 index 0000000..d631755 --- /dev/null +++ b/crates/adapters/activitypub-base/src/error.rs @@ -0,0 +1,48 @@ +use std::fmt::{Display, Formatter}; + +use axum::http::StatusCode; + +#[derive(Debug)] +pub struct Error(pub(crate) anyhow::Error, pub(crate) StatusCode); + +impl Error { + pub fn not_found(e: impl Into) -> Self { + Self(e.into(), StatusCode::NOT_FOUND) + } + + pub fn bad_request(e: impl Into) -> Self { + Self(e.into(), StatusCode::BAD_REQUEST) + } +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) + } +} + +impl From for Error +where + T: Into, +{ + fn from(t: T) -> Self { + Error(t.into(), StatusCode::INTERNAL_SERVER_ERROR) + } +} + +impl axum::response::IntoResponse for Error { + fn into_response(self) -> axum::response::Response { + let status = self.1; + if status.is_server_error() { + tracing::error!(error = %self.0, status = status.as_u16(), "federation error"); + } else { + tracing::debug!(error = %self.0, status = status.as_u16(), "federation response"); + } + let body = if status.is_server_error() { + "internal server error".to_string() + } else { + self.0.to_string() + }; + (status, body).into_response() + } +} diff --git a/crates/adapters/activitypub-base/src/federation.rs b/crates/adapters/activitypub-base/src/federation.rs new file mode 100644 index 0000000..5ccd975 --- /dev/null +++ b/crates/adapters/activitypub-base/src/federation.rs @@ -0,0 +1,50 @@ +use activitypub_federation::config::{Data, FederationConfig, FederationMiddleware, UrlVerifier}; +use activitypub_federation::error::Error as FedError; +use url::Url; + +use crate::data::FederationData; + +#[derive(Clone)] +struct PermissiveVerifier; + +#[async_trait::async_trait] +impl UrlVerifier for PermissiveVerifier { + async fn verify(&self, _url: &Url) -> Result<(), FedError> { + Ok(()) + } +} + +#[derive(Clone)] +pub struct ApFederationConfig(pub FederationConfig); + +impl ApFederationConfig { + pub async fn new(data: FederationData, debug: bool) -> anyhow::Result { + let config = if debug { + FederationConfig::builder() + .domain(&data.domain) + .app_data(data) + .debug(true) + .http_signature_compat(true) + .url_verifier(Box::new(PermissiveVerifier)) + .build() + .await? + } else { + FederationConfig::builder() + .domain(&data.domain) + .app_data(data) + .debug(false) + .http_signature_compat(true) + .build() + .await? + }; + Ok(Self(config)) + } + + pub fn to_request_data(&self) -> Data { + self.0.to_request_data() + } + + pub fn middleware(&self) -> FederationMiddleware { + FederationMiddleware::new(self.0.clone()) + } +} diff --git a/crates/adapters/activitypub-base/src/followers_handler.rs b/crates/adapters/activitypub-base/src/followers_handler.rs new file mode 100644 index 0000000..bfc6658 --- /dev/null +++ b/crates/adapters/activitypub-base/src/followers_handler.rs @@ -0,0 +1,71 @@ +use activitypub_federation::{axum::json::FederationJson, config::Data}; +use axum::extract::Path; +use serde_json::json; + +use crate::data::FederationData; +use crate::error::Error; +use crate::repository::FollowerStatus; + +fn ordered_collection(id: String, total: usize, items: Vec) -> serde_json::Value { + json!({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollection", + "id": id, + "totalItems": total, + "orderedItems": items, + }) +} + +pub async fn followers_handler( + Path(user_id_str): Path, + data: Data, +) -> Result, Error> { + let user_id = uuid::Uuid::parse_str(&user_id_str) + .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?; + + data.user_repo + .find_by_id(user_id) + .await + .map_err(Error::from)? + .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; + + let followers = data + .federation_repo + .get_followers(user_id) + .await + .map_err(Error::from)?; + + let items: Vec = followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .map(|f| f.actor.url) + .collect(); + + let id = format!("{}/users/{}/followers", data.base_url, user_id_str); + Ok(FederationJson(ordered_collection(id, items.len(), items))) +} + +pub async fn following_handler( + Path(user_id_str): Path, + data: Data, +) -> Result, Error> { + let user_id = uuid::Uuid::parse_str(&user_id_str) + .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?; + + data.user_repo + .find_by_id(user_id) + .await + .map_err(Error::from)? + .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; + + let following = data + .federation_repo + .get_following(user_id) + .await + .map_err(Error::from)?; + + let items: Vec = following.into_iter().map(|a| a.url).collect(); + + let id = format!("{}/users/{}/following", data.base_url, user_id_str); + Ok(FederationJson(ordered_collection(id, items.len(), items))) +} diff --git a/crates/adapters/activitypub-base/src/inbox.rs b/crates/adapters/activitypub-base/src/inbox.rs new file mode 100644 index 0000000..acee339 --- /dev/null +++ b/crates/adapters/activitypub-base/src/inbox.rs @@ -0,0 +1,20 @@ +use activitypub_federation::{ + axum::inbox::{receive_activity, ActivityData}, + config::Data, + protocol::context::WithContext, +}; + +use crate::activities::InboxActivities; +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; + +pub async fn inbox_handler( + data: Data, + activity_data: ActivityData, +) -> Result<(), Error> { + receive_activity::, DbActor, FederationData>( + activity_data, &data, + ) + .await +} diff --git a/crates/adapters/activitypub-base/src/lib.rs b/crates/adapters/activitypub-base/src/lib.rs new file mode 100644 index 0000000..816c94e --- /dev/null +++ b/crates/adapters/activitypub-base/src/lib.rs @@ -0,0 +1,23 @@ +pub mod activities; +pub mod actor_handler; +pub mod actors; +pub mod content; +pub mod data; +pub mod error; +pub mod federation; +pub mod followers_handler; +pub mod inbox; +pub mod outbox; +pub mod repository; +pub mod service; +pub mod user; +pub mod webfinger; +pub(crate) mod urls; + +pub use content::ApObjectHandler; +pub use data::FederationData; +pub use error::Error; +pub use federation::ApFederationConfig; +pub use repository::{FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor}; +pub use service::ActivityPubService; +pub use user::{ApUser, ApUserRepository}; diff --git a/crates/adapters/activitypub-base/src/outbox.rs b/crates/adapters/activitypub-base/src/outbox.rs new file mode 100644 index 0000000..1c1b385 --- /dev/null +++ b/crates/adapters/activitypub-base/src/outbox.rs @@ -0,0 +1,48 @@ +use activitypub_federation::{axum::json::FederationJson, config::Data}; +use axum::extract::Path; +use serde::{Deserialize, Serialize}; + +use crate::data::FederationData; +use crate::error::Error; + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OrderedCollection { + #[serde(rename = "@context")] + context: String, + #[serde(rename = "type")] + kind: String, + id: String, + total_items: u64, + ordered_items: Vec, +} + +pub async fn outbox_handler( + Path(user_id_str): Path, + data: Data, +) -> Result, Error> { + let uuid = uuid::Uuid::parse_str(&user_id_str) + .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?; + + data.user_repo + .find_by_id(uuid) + .await + .map_err(Error::from)? + .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; + + let objects = data + .object_handler + .get_local_objects_for_user(uuid) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + + let outbox_url = format!("{}/users/{}/outbox", data.base_url, user_id_str); + + Ok(FederationJson(OrderedCollection { + context: "https://www.w3.org/ns/activitystreams".to_string(), + kind: "OrderedCollection".to_string(), + id: outbox_url, + total_items: objects.len() as u64, + ordered_items: vec![], + })) +} diff --git a/crates/adapters/activitypub-base/src/repository.rs b/crates/adapters/activitypub-base/src/repository.rs new file mode 100644 index 0000000..02f4263 --- /dev/null +++ b/crates/adapters/activitypub-base/src/repository.rs @@ -0,0 +1,50 @@ +use anyhow::Result; +use async_trait::async_trait; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FollowerStatus { + Pending, + Accepted, + Rejected, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FollowingStatus { + Pending, + Accepted, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RemoteActor { + pub url: String, + pub handle: String, + pub inbox_url: String, + pub shared_inbox_url: Option, + pub display_name: Option, +} + +#[derive(Debug, Clone)] +pub struct Follower { + pub actor: RemoteActor, + pub status: FollowerStatus, +} + +#[async_trait] +pub trait FederationRepository: Send + Sync { + async fn add_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowerStatus, follow_activity_id: &str) -> Result<()>; + async fn get_follower_follow_activity_id(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result>; + async fn remove_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result<()>; + async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result>; + async fn update_follower_status(&self, local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowerStatus) -> Result<()>; + async fn add_following(&self, local_user_id: uuid::Uuid, actor: RemoteActor, follow_activity_id: &str) -> Result<()>; + async fn get_follow_activity_id(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result>; + async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()>; + async fn get_following(&self, local_user_id: uuid::Uuid) -> Result>; + async fn count_following(&self, local_user_id: uuid::Uuid) -> Result; + async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()>; + async fn get_remote_actor(&self, actor_url: &str) -> Result>; + async fn get_local_actor_keypair(&self, user_id: uuid::Uuid) -> Result>; + async fn save_local_actor_keypair(&self, user_id: uuid::Uuid, public_key: String, private_key: String) -> Result<()>; + async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result>; + async fn update_following_status(&self, local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowingStatus) -> Result<()>; +} diff --git a/crates/adapters/activitypub-base/src/service.rs b/crates/adapters/activitypub-base/src/service.rs new file mode 100644 index 0000000..711230e --- /dev/null +++ b/crates/adapters/activitypub-base/src/service.rs @@ -0,0 +1,470 @@ +use std::sync::Arc; + +use activitypub_federation::{ + activity_sending::SendActivityTask, + fetch::{object_id::ObjectId, webfinger::webfinger_resolve_actor}, + protocol::context::WithContext, + traits::Actor, +}; +use axum::{routing::get, routing::post, Router}; +use url::Url; + +use crate::{ + activities::{AcceptActivity, CreateActivity, FollowActivity, RejectActivity, UndoActivity}, + actor_handler::actor_handler, + actors::{get_local_actor, DbActor}, + content::ApObjectHandler, + data::FederationData, + federation::ApFederationConfig, + followers_handler::{followers_handler, following_handler}, + inbox::inbox_handler, + outbox::outbox_handler, + repository::{FederationRepository, FollowerStatus, RemoteActor}, + user::ApUserRepository, + urls::activity_url, + webfinger::webfinger_handler, +}; + +pub(crate) async fn send_with_retry( + sends: Vec, + data: &activitypub_federation::config::Data, +) -> Vec { + let mut failures = vec![]; + for send in sends { + let mut delay = std::time::Duration::from_secs(1); + for attempt in 1..=3u32 { + match send.clone().sign_and_send(data).await { + Ok(()) => break, + Err(e) if attempt < 3 => { + tracing::warn!(attempt, error = %e, "delivery failed, retrying"); + tokio::time::sleep(delay).await; + delay *= 2; + } + Err(e) => { + tracing::error!(attempt, error = %e, "delivery failed permanently"); + failures.push(anyhow::anyhow!(e)); + } + } + } + } + failures +} + +pub struct ActivityPubService { + federation_config: ApFederationConfig, + base_url: String, +} + +impl ActivityPubService { + pub async fn new( + repo: Arc, + user_repo: Arc, + object_handler: Arc, + base_url: String, + debug: bool, + ) -> anyhow::Result { + let data = FederationData::new(repo, user_repo, object_handler, base_url.clone()); + let federation_config = ApFederationConfig::new(data, debug).await?; + Ok(Self { federation_config, base_url }) + } + + pub fn federation_config(&self) -> &ApFederationConfig { + &self.federation_config + } + + pub fn request_data(&self) -> activitypub_federation::config::Data { + self.federation_config.to_request_data() + } + + pub async fn actor_json(&self, user_id_str: &str) -> anyhow::Result { + use activitypub_federation::traits::Object; + let uuid = uuid::Uuid::parse_str(user_id_str)?; + let data = self.federation_config.to_request_data(); + let actor = get_local_actor(uuid, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + let person = actor.into_json(&data).await + .map_err(|e| anyhow::anyhow!("{e}"))?; + Ok(serde_json::to_string(&WithContext::new_default(person))?) + } + + pub fn router(&self) -> Router { + Router::new() + .route("/.well-known/webfinger", get(webfinger_handler)) + .route("/users/{user_id}", get(actor_handler)) + .route("/users/{user_id}/inbox", post(inbox_handler)) + .route("/users/{user_id}/outbox", get(outbox_handler)) + .route("/users/{user_id}/followers", get(followers_handler)) + .route("/users/{user_id}/following", get(following_handler)) + .layer(self.federation_config.middleware()) + } + + pub async fn follow(&self, local_user_id: uuid::Uuid, handle: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + + let remote_actor: DbActor = webfinger_resolve_actor(handle, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + let follow_id_str = follow_id.to_string(); + let follow = FollowActivity { + id: follow_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: ObjectId::from(remote_actor.ap_id.clone()), + }; + let follow_with_ctx = WithContext::new_default(follow); + + let sends = SendActivityTask::prepare( + &follow_with_ctx, + &local_actor, + vec![remote_actor.inbox()], + &data, + ) + .await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!(count = failures.len(), "some activity deliveries failed permanently"); + } + + let remote = RemoteActor { + url: remote_actor.ap_id.to_string(), + handle: remote_actor.username.clone(), + inbox_url: remote_actor.inbox_url.to_string(), + shared_inbox_url: None, + display_name: Some(remote_actor.username.clone()), + }; + data.federation_repo + .add_following(local_user_id, remote, &follow_id_str) + .await?; + + Ok(()) + } + + pub async fn unfollow(&self, local_user_id: uuid::Uuid, actor_url_str: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + + let remote = data + .federation_repo + .get_remote_actor(actor_url_str) + .await? + .ok_or_else(|| anyhow::anyhow!("remote actor not found: {}", actor_url_str))?; + + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let remote_ap_id = Url::parse(actor_url_str)?; + let inbox = Url::parse(&remote.inbox_url)?; + + let follow_activity_id_str = data + .federation_repo + .get_follow_activity_id(local_user_id, actor_url_str) + .await?; + let follow_id = match follow_activity_id_str { + Some(id) => Url::parse(&id)?, + None => activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + }; + let follow = FollowActivity { + id: follow_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: ObjectId::from(remote_ap_id), + }; + + let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + let undo = UndoActivity { + id: undo_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: follow, + }; + + let sends = SendActivityTask::prepare( + &WithContext::new_default(undo), + &local_actor, + vec![inbox], + &data, + ) + .await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!(count = failures.len(), "some activity deliveries failed permanently"); + } + + data.federation_repo + .remove_following(local_user_id, actor_url_str) + .await?; + + data.object_handler + .on_actor_removed(&Url::parse(actor_url_str)?) + .await?; + + Ok(()) + } + + pub async fn accept_follower( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let remote_actor = data + .federation_repo + .get_remote_actor(remote_actor_url) + .await? + .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; + + let follow_id_str = data + .federation_repo + .get_follower_follow_activity_id(local_user_id, remote_actor_url) + .await? + .ok_or_else(|| anyhow::anyhow!("follow activity id not found for {}", remote_actor_url))?; + let follow_id = Url::parse(&follow_id_str)?; + let follow = FollowActivity { + id: follow_id, + kind: Default::default(), + actor: ObjectId::from(Url::parse(remote_actor_url)?), + object: ObjectId::from(local_actor.ap_id.clone()), + }; + let accept = AcceptActivity { + id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: follow, + }; + + data.federation_repo + .update_follower_status(local_user_id, remote_actor_url, FollowerStatus::Accepted) + .await?; + + let inbox = Url::parse(&remote_actor.inbox_url)?; + let sends = SendActivityTask::prepare( + &WithContext::new_default(accept), + &local_actor, + vec![inbox.clone()], + &data, + ) + .await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!("failed to deliver Accept activity, but follower is marked accepted locally"); + } + + self.spawn_backfill(local_user_id, remote_actor.inbox_url.clone()); + + Ok(()) + } + + pub async fn reject_follower( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let remote_actor = data + .federation_repo + .get_remote_actor(remote_actor_url) + .await? + .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; + + let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + let follow = FollowActivity { + id: follow_id, + kind: Default::default(), + actor: ObjectId::from(Url::parse(remote_actor_url)?), + object: ObjectId::from(local_actor.ap_id.clone()), + }; + let reject = RejectActivity { + id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: follow, + }; + + let inbox = Url::parse(&remote_actor.inbox_url)?; + let sends = SendActivityTask::prepare( + &WithContext::new_default(reject), + &local_actor, + vec![inbox], + &data, + ) + .await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!(count = failures.len(), "some activity deliveries failed permanently"); + } + + data.federation_repo + .remove_follower(local_user_id, remote_actor_url) + .await?; + + Ok(()) + } + + pub async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + data.federation_repo.get_pending_followers(local_user_id).await + } + + pub async fn get_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + let followers = data.federation_repo.get_followers(local_user_id).await?; + Ok(followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .map(|f| f.actor) + .collect()) + } + + pub async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result { + let data = self.federation_config.to_request_data(); + let followers = data.federation_repo.get_followers(local_user_id).await?; + Ok(followers.into_iter().filter(|f| f.status == FollowerStatus::Accepted).count()) + } + + pub async fn get_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + data.federation_repo.get_following(local_user_id).await + } + + pub async fn count_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result { + let data = self.federation_config.to_request_data(); + data.federation_repo.count_following(local_user_id).await + } + + pub async fn remove_follower(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + data.federation_repo.remove_follower(local_user_id, actor_url).await + } + + /// Broadcast a single object to all accepted followers as a Create activity. + /// Called by project-specific event handlers when new content is created. + pub async fn broadcast_to_followers( + &self, + local_user_id: uuid::Uuid, + ap_id: Url, + object: serde_json::Value, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let followers = data.federation_repo.get_followers(local_user_id).await?; + let accepted: Vec<_> = followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .collect(); + + if accepted.is_empty() { + return Ok(()); + } + + let create = CreateActivity { + id: ap_id.clone(), + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object, + }; + let create_with_ctx = WithContext::new_default(create); + + let inboxes: Vec = accepted + .iter() + .filter_map(|f| Url::parse(&f.actor.inbox_url).ok()) + .collect(); + + let sends = SendActivityTask::prepare(&create_with_ctx, &local_actor, inboxes, &data).await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!(count = failures.len(), "some activity deliveries failed permanently"); + } + + Ok(()) + } + + fn spawn_backfill(&self, owner_user_id: uuid::Uuid, follower_inbox_url: String) { + let config = self.federation_config.clone(); + let base_url = self.base_url.clone(); + tokio::spawn(async move { + if let Err(e) = ActivityPubService::run_backfill(config, base_url, owner_user_id, follower_inbox_url).await { + tracing::warn!(error = %e, "backfill: task failed"); + } + }); + } + + async fn run_backfill( + config: ApFederationConfig, + base_url: String, + owner_user_id: uuid::Uuid, + follower_inbox_url: String, + ) -> anyhow::Result<()> { + const BATCH_SIZE: usize = 20; + + let data = config.to_request_data(); + let local_actor = get_local_actor(owner_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + let inbox = Url::parse(&follower_inbox_url)?; + + let mut objects = data.object_handler.get_local_objects_for_user(owner_user_id).await?; + objects.reverse(); // oldest first → chronological feed + + let total = objects.len(); + let mut success_count = 0usize; + let mut failure_count = 0usize; + + for chunk in objects.chunks(BATCH_SIZE) { + for (ap_id, object_json) in chunk { + // Use a stable Create activity ID derived from the object's ap_id + let create_id = Url::parse(&format!("{}/activities/create/{}", base_url, + uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, ap_id.as_str().as_bytes()) + ))?; + + let create = CreateActivity { + id: create_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: object_json.clone(), + }; + + let sends = SendActivityTask::prepare( + &WithContext::new_default(create), + &local_actor, + vec![inbox.clone()], + &data, + ).await?; + let failures = send_with_retry(sends, &data).await; + if failures.is_empty() { + success_count += 1; + } else { + failure_count += 1; + } + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + tracing::info!( + user_id = %owner_user_id, + follower = %follower_inbox_url, + sent = success_count, + failed = failure_count, + total = total, + "backfill complete" + ); + Ok(()) + } +} diff --git a/crates/adapters/activitypub-base/src/urls.rs b/crates/adapters/activitypub-base/src/urls.rs new file mode 100644 index 0000000..209fff0 --- /dev/null +++ b/crates/adapters/activitypub-base/src/urls.rs @@ -0,0 +1,20 @@ +use url::Url; + +use crate::error::Error; + +pub fn extract_user_id_from_url(url: &Url) -> Option { + let path = url.path(); + path.strip_prefix("/users/") + .and_then(|s| s.split('/').next()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()) +} + +pub fn activity_url(base_url: &str) -> Result { + Url::parse(&format!("{}/activities/{}", base_url, uuid::Uuid::new_v4())) + .map_err(|e| Error::bad_request(anyhow::anyhow!(e))) +} + +pub fn actor_url(base_url: &str, user_id: uuid::Uuid) -> Url { + Url::parse(&format!("{}/users/{}", base_url, user_id)) + .expect("base_url is always a valid URL prefix") +} diff --git a/crates/adapters/activitypub-base/src/user.rs b/crates/adapters/activitypub-base/src/user.rs new file mode 100644 index 0000000..3120dfe --- /dev/null +++ b/crates/adapters/activitypub-base/src/user.rs @@ -0,0 +1,13 @@ +use async_trait::async_trait; + +#[derive(Debug, Clone)] +pub struct ApUser { + pub id: uuid::Uuid, + pub username: String, +} + +#[async_trait] +pub trait ApUserRepository: Send + Sync { + async fn find_by_id(&self, id: uuid::Uuid) -> anyhow::Result>; + async fn find_by_username(&self, username: &str) -> anyhow::Result>; +} diff --git a/crates/adapters/activitypub-base/src/webfinger.rs b/crates/adapters/activitypub-base/src/webfinger.rs new file mode 100644 index 0000000..3a5c012 --- /dev/null +++ b/crates/adapters/activitypub-base/src/webfinger.rs @@ -0,0 +1,42 @@ +use activitypub_federation::{ + config::Data, + fetch::webfinger::{build_webfinger_response, extract_webfinger_name, Webfinger}, +}; +use axum::{ + extract::Query, + http::header, + response::{IntoResponse, Response}, +}; +use serde::Deserialize; + +use crate::data::FederationData; +use crate::error::Error; + +#[derive(Deserialize)] +pub struct WebfingerQuery { + resource: String, +} + +pub async fn webfinger_handler( + Query(query): Query, + data: Data, +) -> Result { + let name = extract_webfinger_name(&query.resource, &data)?; + + let user = data + .user_repo + .find_by_username(name) + .await + .map_err(Error::from)? + .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; + + let ap_id = crate::urls::actor_url(&data.base_url, user.id); + + let wf: Webfinger = build_webfinger_response(query.resource, ap_id); + let body = serde_json::to_string(&wf) + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + Ok(( + [(header::CONTENT_TYPE, "application/jrd+json")], + body, + ).into_response()) +} diff --git a/crates/adapters/activitypub/Cargo.toml b/crates/adapters/activitypub/Cargo.toml index ca2a57a..b6abab3 100644 --- a/crates/adapters/activitypub/Cargo.toml +++ b/crates/adapters/activitypub/Cargo.toml @@ -4,15 +4,13 @@ version = "0.1.0" edition = "2024" [dependencies] +activitypub-base = { workspace = true } domain = { workspace = true } -application = { workspace = true } tokio = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -reqwest = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } -thiserror = { workspace = true } anyhow = { workspace = true } tracing = { workspace = true } async-trait = { workspace = true } @@ -20,5 +18,3 @@ event-publisher = { workspace = true } activitypub_federation = "0.7.0-beta.11" url = { version = "2", features = ["serde"] } -enum_delegate = "0.2" -axum = "0.8" diff --git a/crates/adapters/activitypub/src/event_handler.rs b/crates/adapters/activitypub/src/event_handler.rs index 68e5a5f..50a6591 100644 --- a/crates/adapters/activitypub/src/event_handler.rs +++ b/crates/adapters/activitypub/src/event_handler.rs @@ -1,37 +1,31 @@ -use activitypub_federation::{ - activity_sending::SendActivityTask, - fetch::object_id::ObjectId, - protocol::context::WithContext, - traits::Object, -}; use async_trait::async_trait; use domain::{ errors::DomainError, events::DomainEvent, + ports::MovieRepository, value_objects::{ReviewId, UserId}, }; use event_publisher::EventHandler; -use url::Url; +use std::sync::Arc; -use crate::{ - activities::CreateActivity, - actors::get_local_actor, - federation::ApFederationConfig, - objects::DbReview, - repository::FollowerStatus, -}; +use activitypub_base::ActivityPubService; + +use crate::objects::review_to_ap_object; +use crate::urls::{actor_url, review_url}; pub struct ActivityPubEventHandler { - federation_config: ApFederationConfig, + ap_service: Arc, + movie_repo: Arc, base_url: String, } impl ActivityPubEventHandler { - pub fn new(federation_config: ApFederationConfig, base_url: String) -> Self { - Self { - federation_config, - base_url, - } + pub fn new( + ap_service: Arc, + movie_repo: Arc, + base_url: String, + ) -> Self { + Self { ap_service, movie_repo, base_url } } } @@ -39,11 +33,7 @@ impl ActivityPubEventHandler { impl EventHandler for ActivityPubEventHandler { async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { match event { - DomainEvent::ReviewLogged { - review_id, - user_id, - .. - } => self + DomainEvent::ReviewLogged { review_id, user_id, .. } => self .on_review_logged(user_id, review_id) .await .map_err(|e| DomainError::InfrastructureError(e.to_string())), @@ -58,70 +48,29 @@ impl ActivityPubEventHandler { user_id: &UserId, review_id: &ReviewId, ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - - let followers = data.federation_repo.get_followers(user_id.clone()).await?; - tracing::debug!(user_id = %user_id.value(), count = followers.len(), "AP: got followers for review"); - - let accepted: Vec<_> = followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .collect(); - - tracing::debug!(accepted = accepted.len(), "AP: accepted followers"); - - if accepted.is_empty() { - return Ok(()); - } - - let review = match data.movie_repo.get_review_by_id(review_id).await? { + let review = match self.movie_repo.get_review_by_id(review_id).await? { Some(r) => r, None => return Ok(()), }; - let local_actor = get_local_actor(user_id.clone(), &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; + let ap_id = review_url(&self.base_url, review_id); + let actor = actor_url(&self.base_url, user_id.value()); - let activity_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + let movie = self.movie_repo.get_movie_by_id(review.movie_id()).await.ok().flatten(); + let movie_title = movie.as_ref() + .map(|m| m.title().value().to_string()) + .unwrap_or_else(|| "Unknown".to_string()); + let release_year = movie.as_ref().map(|m| m.release_year().value()).unwrap_or(0); + let poster_url = movie.as_ref() + .and_then(|m| m.poster_path()) + .map(|p| format!("{}/posters/{}", self.base_url, p.value())); - let db_review = DbReview { - ap_id: crate::urls::review_url(&self.base_url, review_id), - review, - }; - let object = db_review - .into_json(&data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; + let obj = review_to_ap_object(&review, ap_id.clone(), actor, movie_title, release_year, poster_url); + let json = serde_json::to_value(obj)?; - let create = CreateActivity { - id: activity_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object, - }; - let create_with_ctx = WithContext::new_default(create); - - let inboxes: Vec = accepted - .iter() - .filter_map(|f| { - let url = Url::parse(&f.actor.inbox_url); - if url.is_err() { - tracing::warn!(inbox = %f.actor.inbox_url, "AP: invalid inbox URL, skipping follower"); - } - url.ok() - }) - .collect(); - - tracing::debug!(inboxes = inboxes.len(), "AP: delivering to inboxes"); - - let sends = - SendActivityTask::prepare(&create_with_ctx, &local_actor, inboxes, &data).await?; - tracing::debug!(sends = sends.len(), "AP: prepared sends"); - let failures = crate::service::send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some activity deliveries failed permanently"); - } + self.ap_service + .broadcast_to_followers(user_id.value(), ap_id, json) + .await?; Ok(()) } diff --git a/crates/adapters/activitypub/src/lib.rs b/crates/adapters/activitypub/src/lib.rs index c678093..a276992 100644 --- a/crates/adapters/activitypub/src/lib.rs +++ b/crates/adapters/activitypub/src/lib.rs @@ -1,22 +1,17 @@ -pub mod activities; -pub mod actor_handler; -pub mod actors; -pub mod data; -pub mod error; pub mod event_handler; -pub mod federation; -pub mod followers_handler; -pub mod inbox; pub mod objects; -pub mod outbox; -pub mod repository; -pub mod service; +pub mod remote_review_repository; +pub mod review_handler; +pub mod user_adapter; pub(crate) mod urls; -pub mod webfinger; -pub use data::FederationData; -pub use error::Error; +// Re-export the generic base types that callers need +pub use activitypub_base::{ + ActivityPubService, ApFederationConfig, ApObjectHandler, ApUser, ApUserRepository, + FederationData, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, +}; + pub use event_handler::ActivityPubEventHandler; -pub use federation::ApFederationConfig; -pub use repository::{FederationRepository, Follower, FollowerStatus, RemoteActor}; -pub use service::ActivityPubService; +pub use remote_review_repository::RemoteReviewRepository; +pub use review_handler::ReviewObjectHandler; +pub use user_adapter::DomainUserRepoAdapter; diff --git a/crates/adapters/activitypub/src/objects.rs b/crates/adapters/activitypub/src/objects.rs index 77089e4..f974b9e 100644 --- a/crates/adapters/activitypub/src/objects.rs +++ b/crates/adapters/activitypub/src/objects.rs @@ -1,33 +1,22 @@ -use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - kinds::object::NoteType, - protocol::verification::verify_domains_match, - traits::Object, -}; +use activitypub_federation::kinds::object::NoteType; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use url::Url; -use domain::models::{Review, ReviewSource}; -use domain::value_objects::ReviewId; - -use crate::actors::DbActor; -use crate::data::FederationData; -use crate::error::Error; +use domain::models::Review; #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct ReviewObject { #[serde(rename = "type")] pub(crate) kind: NoteType, - pub(crate) id: ObjectId, - pub(crate) attributed_to: ObjectId, + pub(crate) id: Url, + pub(crate) attributed_to: Url, pub(crate) content: String, pub(crate) published: DateTime, pub(crate) movie_title: String, #[serde(default)] - pub(crate) release_year: u16, // 0 = unknown; default for old AP messages + pub(crate) release_year: u16, #[serde(default)] pub(crate) poster_url: Option, pub(crate) rating: u8, @@ -35,115 +24,36 @@ pub struct ReviewObject { pub(crate) watched_at: DateTime, } -#[derive(Debug, Clone)] -pub struct DbReview { - pub review: Review, - pub ap_id: Url, -} +/// Serialize a local Review into a ReviewObject for AP delivery. +/// Takes movie metadata explicitly since the handler fetches it separately. +pub fn review_to_ap_object( + review: &Review, + ap_id: Url, + actor_url: Url, + movie_title: String, + release_year: u16, + poster_url: Option, +) -> ReviewObject { + let stars: String = "\u{2B50}".repeat(review.rating().value() as usize); + let comment_text = review.comment().map(|c| c.value().to_string()); + let year_str = if release_year > 0 { format!(" ({})", release_year) } else { String::new() }; + let watched_str = format!("Watched: {}", review.watched_at().format("%b %-d, %Y")); + let content = match &comment_text { + Some(c) => format!("{} {}{}\n{}\n{}", stars, movie_title, year_str, c, watched_str), + None => format!("{} {}{}\n{}", stars, movie_title, year_str, watched_str), + }; -#[async_trait::async_trait] -impl Object for DbReview { - type DataType = FederationData; - type Kind = ReviewObject; - type Error = Error; - - fn id(&self) -> &Url { - &self.ap_id - } - - async fn read_from_id( - _object_id: Url, - _data: &Data, - ) -> Result, Self::Error> { - // Incoming activities provide the full object; no need to dereference local reviews - Ok(None) - } - - async fn into_json(self, data: &Data) -> Result { - let r = &self.review; - let ap_id = crate::urls::review_url(&data.base_url, r.id()); - let actor_url = crate::urls::actor_url(&data.base_url, r.user_id()); - - let movie = data.movie_repo.get_movie_by_id(r.movie_id()).await - .ok().flatten(); - let movie_title = movie.as_ref() - .map(|m| m.title().value().to_string()) - .unwrap_or_else(|| "Unknown".to_string()); - let release_year = movie.as_ref() - .map(|m| m.release_year().value()) - .unwrap_or(0); - let poster_url = movie.as_ref() - .and_then(|m| m.poster_path()) - .map(|p| format!("{}/posters/{}", data.base_url, p.value())); - - let stars: String = "\u{2B50}".repeat(r.rating().value() as usize); - let comment_text = r.comment().map(|c| c.value().to_string()); - let year_str = if release_year > 0 { format!(" ({})", release_year) } else { String::new() }; - let watched_str = format!("Watched: {}", r.watched_at().format("%b %-d, %Y")); - let content = match &comment_text { - Some(c) => format!("{} {}{}\n{}\n{}", stars, movie_title, year_str, c, watched_str), - None => format!("{} {}{}\n{}", stars, movie_title, year_str, watched_str), - }; - - Ok(ReviewObject { - kind: NoteType::default(), - id: ap_id.into(), - attributed_to: actor_url.into(), - content, - published: DateTime::from_naive_utc_and_offset(*r.created_at(), Utc), - movie_title, - release_year, - poster_url, - rating: r.rating().value(), - comment: comment_text, - watched_at: DateTime::from_naive_utc_and_offset(*r.watched_at(), Utc), - }) - } - - async fn verify( - json: &Self::Kind, - expected_domain: &Url, - _data: &Data, - ) -> Result<(), Self::Error> { - verify_domains_match(json.attributed_to.inner(), expected_domain)?; - Ok(()) - } - - async fn from_json( - json: Self::Kind, - data: &Data, - ) -> Result { - let actor_url = json.attributed_to.inner().to_string(); - - let review_id = ReviewId::generate(); - let movie_id_uuid = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, json.movie_title.as_bytes()); - let movie_id = domain::value_objects::MovieId::from_uuid(movie_id_uuid); - let user_id_uuid = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, actor_url.as_bytes()); - let user_id = domain::value_objects::UserId::from_uuid(user_id_uuid); - let rating = domain::value_objects::Rating::new(json.rating.min(5)) - .map_err(|e| Error::bad_request(anyhow::anyhow!("{}", e)))?; - let comment = json - .comment - .map(|c| domain::value_objects::Comment::new(c)) - .transpose() - .map_err(|e| Error::bad_request(anyhow::anyhow!("{}", e)))?; - let watched_at = json.watched_at.naive_utc(); - let created_at = json.published.naive_utc(); - - let review = Review::from_persistence( - review_id, - movie_id, - user_id, - rating, - comment, - watched_at, - created_at, - ReviewSource::Remote { actor_url }, - ); - - let ap_id_url = json.id.into_inner(); - data.federation_repo.save_remote_review(&review, ap_id_url.as_str(), &json.movie_title, json.release_year, json.poster_url.as_deref()).await?; - - Ok(DbReview { review, ap_id: ap_id_url }) + ReviewObject { + kind: NoteType::default(), + id: ap_id, + attributed_to: actor_url, + content, + published: DateTime::from_naive_utc_and_offset(*review.created_at(), Utc), + movie_title, + release_year, + poster_url, + rating: review.rating().value(), + comment: comment_text, + watched_at: DateTime::from_naive_utc_and_offset(*review.watched_at(), Utc), } } diff --git a/crates/adapters/activitypub/src/remote_review_repository.rs b/crates/adapters/activitypub/src/remote_review_repository.rs new file mode 100644 index 0000000..5970ce8 --- /dev/null +++ b/crates/adapters/activitypub/src/remote_review_repository.rs @@ -0,0 +1,29 @@ +use anyhow::Result; +use async_trait::async_trait; +use chrono::NaiveDateTime; +use domain::models::Review; + +#[async_trait] +pub trait RemoteReviewRepository: Send + Sync { + async fn save_remote_review( + &self, + review: &Review, + ap_id: &str, + movie_title: &str, + release_year: u16, + poster_url: Option<&str>, + ) -> Result<()>; + + async fn delete_remote_review(&self, ap_id: &str, actor_url: &str) -> Result<()>; + + async fn update_remote_review( + &self, + ap_id: &str, + actor_url: &str, + rating: u8, + comment: Option<&str>, + watched_at: NaiveDateTime, + ) -> Result<()>; + + async fn delete_by_actor(&self, actor_url: &str) -> Result<()>; +} diff --git a/crates/adapters/activitypub/src/review_handler.rs b/crates/adapters/activitypub/src/review_handler.rs new file mode 100644 index 0000000..ad78cd0 --- /dev/null +++ b/crates/adapters/activitypub/src/review_handler.rs @@ -0,0 +1,138 @@ +use std::sync::Arc; + +use activitypub_base::ApObjectHandler; +use async_trait::async_trait; +use domain::{ + models::{Review, ReviewSource}, + ports::MovieRepository, + value_objects::{Comment, MovieId, Rating, ReviewId, UserId}, +}; +use url::Url; + +use crate::objects::{review_to_ap_object, ReviewObject}; +use crate::remote_review_repository::RemoteReviewRepository; +use crate::urls::{actor_url, review_url}; + +pub struct ReviewObjectHandler { + pub movie_repo: Arc, + pub review_store: Arc, + pub base_url: String, +} + +#[async_trait] +impl ApObjectHandler for ReviewObjectHandler { + async fn get_local_objects_for_user( + &self, + user_id: uuid::Uuid, + ) -> anyhow::Result> { + let domain_user_id = UserId::from_uuid(user_id); + let history = self.movie_repo.get_user_history(&domain_user_id).await?; + + let mut results = Vec::new(); + for entry in history { + let review = entry.review(); + if !matches!(review.source(), ReviewSource::Local) { + continue; + } + + let ap_id = review_url(&self.base_url, review.id()); + let actor_url = actor_url(&self.base_url, user_id); + + let movie = self.movie_repo.get_movie_by_id(review.movie_id()).await.ok().flatten(); + let movie_title = movie.as_ref() + .map(|m| m.title().value().to_string()) + .unwrap_or_else(|| "Unknown".to_string()); + let release_year = movie.as_ref() + .map(|m| m.release_year().value()) + .unwrap_or(0); + let poster_url = movie.as_ref() + .and_then(|m| m.poster_path()) + .map(|p| format!("{}/posters/{}", self.base_url, p.value())); + + let obj = review_to_ap_object(review, ap_id.clone(), actor_url, movie_title, release_year, poster_url); + let json = serde_json::to_value(obj)?; + results.push((ap_id, json)); + } + Ok(results) + } + + async fn on_create( + &self, + _ap_id: &Url, + _actor_url: &Url, + object: serde_json::Value, + ) -> anyhow::Result<()> { + let obj: ReviewObject = match serde_json::from_value(object) { + Ok(o) => o, + Err(e) => { + tracing::debug!("ignoring unrecognized Create object: {}", e); + return Ok(()); + } + }; + + let actor_url_str = obj.attributed_to.to_string(); + let review_id = ReviewId::generate(); + let movie_id = MovieId::from_uuid(uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, obj.movie_title.as_bytes())); + let user_id = UserId::from_uuid(uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, actor_url_str.as_bytes())); + let rating = Rating::new(obj.rating.min(5))?; + let comment = obj.comment.map(Comment::new).transpose()?; + + let review = Review::from_persistence( + review_id, + movie_id, + user_id, + rating, + comment, + obj.watched_at.naive_utc(), + obj.published.naive_utc(), + ReviewSource::Remote { actor_url: actor_url_str }, + ); + + self.review_store + .save_remote_review(&review, obj.id.as_str(), &obj.movie_title, obj.release_year, obj.poster_url.as_deref()) + .await?; + + Ok(()) + } + + async fn on_update( + &self, + ap_id: &Url, + actor_url: &Url, + object: serde_json::Value, + ) -> anyhow::Result<()> { + let obj: ReviewObject = match serde_json::from_value(object) { + Ok(o) => o, + Err(_) => { + tracing::debug!(actor = %actor_url, "ignoring non-review Update activity"); + return Ok(()); + } + }; + + if obj.attributed_to != *actor_url { + anyhow::bail!("update actor does not match object attributed_to"); + } + + self.review_store + .update_remote_review( + ap_id.as_str(), + actor_url.as_str(), + obj.rating.min(5), + obj.comment.as_deref(), + obj.watched_at.naive_utc(), + ) + .await?; + + Ok(()) + } + + async fn on_delete(&self, ap_id: &Url, actor_url: &Url) -> anyhow::Result<()> { + self.review_store + .delete_remote_review(ap_id.as_str(), actor_url.as_str()) + .await + } + + async fn on_actor_removed(&self, actor_url: &Url) -> anyhow::Result<()> { + self.review_store.delete_by_actor(actor_url.as_str()).await + } +} diff --git a/crates/adapters/activitypub/src/urls.rs b/crates/adapters/activitypub/src/urls.rs index deb1dcf..ce47ac4 100644 --- a/crates/adapters/activitypub/src/urls.rs +++ b/crates/adapters/activitypub/src/urls.rs @@ -1,26 +1,9 @@ use url::Url; -use uuid::Uuid; -use domain::value_objects::{UserId, ReviewId}; -use crate::error::Error; - -/// Extracts a UserId from a URL like `https://example.com/users/{uuid}[/...]` -pub fn extract_user_id_from_url(url: &Url) -> Option { - let path = url.path(); - path.strip_prefix("/users/") - .and_then(|s| s.split('/').next()) - .and_then(|uid_str| Uuid::parse_str(uid_str).ok()) - .map(UserId::from_uuid) -} - -/// Generates a fresh activity URL: `{base_url}/activities/{uuid}` -pub fn activity_url(base_url: &str) -> Result { - Url::parse(&format!("{}/activities/{}", base_url, Uuid::new_v4())) - .map_err(|e| Error::bad_request(anyhow::anyhow!(e))) -} +use domain::value_objects::ReviewId; /// Builds the canonical actor URL: `{base_url}/users/{user_id}` -pub fn actor_url(base_url: &str, user_id: &UserId) -> Url { - Url::parse(&format!("{}/users/{}", base_url, user_id.value())) +pub fn actor_url(base_url: &str, user_id: uuid::Uuid) -> Url { + Url::parse(&format!("{}/users/{}", base_url, user_id)) .expect("base_url is always a valid URL prefix") } @@ -29,10 +12,3 @@ pub fn review_url(base_url: &str, review_id: &ReviewId) -> Url { Url::parse(&format!("{}/reviews/{}", base_url, review_id.value())) .expect("base_url is always a valid URL prefix") } - -/// Stable Create-activity URL derived from review ID. -/// Deterministic so repeated backfills to different followers don't create duplicate posts. -pub fn create_activity_url(base_url: &str, review_id: &ReviewId) -> Result { - Url::parse(&format!("{}/activities/create/{}", base_url, review_id.value())) - .map_err(|e| Error::bad_request(anyhow::anyhow!(e))) -} diff --git a/crates/adapters/activitypub/src/user_adapter.rs b/crates/adapters/activitypub/src/user_adapter.rs new file mode 100644 index 0000000..bd4557c --- /dev/null +++ b/crates/adapters/activitypub/src/user_adapter.rs @@ -0,0 +1,28 @@ +use std::sync::Arc; + +use activitypub_base::{ApUser, ApUserRepository}; +use async_trait::async_trait; +use domain::{ports::UserRepository, value_objects::UserId}; + +pub struct DomainUserRepoAdapter(pub Arc); + +#[async_trait] +impl ApUserRepository for DomainUserRepoAdapter { + async fn find_by_id(&self, id: uuid::Uuid) -> anyhow::Result> { + let user_id = UserId::from_uuid(id); + Ok(self.0.find_by_id(&user_id).await?.map(|u| ApUser { + id: u.id().value(), + username: u.username().value().to_string(), + })) + } + + async fn find_by_username(&self, username: &str) -> anyhow::Result> { + use domain::value_objects::Username; + let uname = Username::new(username.to_string()) + .map_err(|e| anyhow::anyhow!(e.to_string()))?; + Ok(self.0.find_by_username(&uname).await?.map(|u| ApUser { + id: u.id().value(), + username: u.username().value().to_string(), + })) + } +} diff --git a/crates/adapters/sqlite/Cargo.toml b/crates/adapters/sqlite/Cargo.toml index f226e45..ff4b12f 100644 --- a/crates/adapters/sqlite/Cargo.toml +++ b/crates/adapters/sqlite/Cargo.toml @@ -13,6 +13,7 @@ sqlx = { version = "0.8.6", features = [ domain = { workspace = true } activitypub = { workspace = true } +activitypub-base = { workspace = true } anyhow = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } diff --git a/crates/adapters/sqlite/src/federation.rs b/crates/adapters/sqlite/src/federation.rs index b41a781..f337fbb 100644 --- a/crates/adapters/sqlite/src/federation.rs +++ b/crates/adapters/sqlite/src/federation.rs @@ -3,9 +3,9 @@ use async_trait::async_trait; use chrono::Utc; use sqlx::{Row, SqlitePool}; -use activitypub::repository::{FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor}; +use activitypub_base::{FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor}; +use activitypub::RemoteReviewRepository; use domain::models::{Review, ReviewSource}; -use domain::value_objects::UserId; use crate::models::datetime_to_str; @@ -39,12 +39,12 @@ fn str_to_status(s: &str) -> FollowerStatus { impl FederationRepository for SqliteFederationRepository { async fn add_follower( &self, - local_user_id: UserId, + local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowerStatus, follow_activity_id: &str, ) -> Result<()> { - let uid = local_user_id.value().to_string(); + let uid = local_user_id.to_string(); let status_str = status_to_str(&status); let now = Utc::now().naive_utc(); let created_at = datetime_to_str(&now); @@ -69,10 +69,10 @@ impl FederationRepository for SqliteFederationRepository { async fn get_follower_follow_activity_id( &self, - local_user_id: UserId, + local_user_id: uuid::Uuid, remote_actor_url: &str, ) -> Result> { - let uid = local_user_id.value().to_string(); + let uid = local_user_id.to_string(); let row: Option> = sqlx::query_scalar( "SELECT follow_activity_id FROM ap_followers WHERE local_user_id = ? AND remote_actor_url = ?", ) @@ -83,22 +83,18 @@ impl FederationRepository for SqliteFederationRepository { Ok(row.flatten()) } - async fn remove_follower(&self, local_user_id: UserId, remote_actor_url: &str) -> Result<()> { - let uid = local_user_id.value().to_string(); - - sqlx::query( - "DELETE FROM ap_followers WHERE local_user_id = ? AND remote_actor_url = ?", - ) - .bind(&uid) - .bind(remote_actor_url) - .execute(&self.pool) - .await?; - + async fn remove_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result<()> { + let uid = local_user_id.to_string(); + sqlx::query("DELETE FROM ap_followers WHERE local_user_id = ? AND remote_actor_url = ?") + .bind(&uid) + .bind(remote_actor_url) + .execute(&self.pool) + .await?; Ok(()) } - async fn get_followers(&self, local_user_id: UserId) -> Result> { - let uid = local_user_id.value().to_string(); + async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result> { + let uid = local_user_id.to_string(); let rows = sqlx::query( "SELECT f.remote_actor_url, f.status, @@ -122,13 +118,7 @@ impl FederationRepository for SqliteFederationRepository { let display_name: Option = row.try_get("display_name").ok().flatten(); Follower { - actor: RemoteActor { - url, - handle, - inbox_url, - shared_inbox_url, - display_name, - }, + actor: RemoteActor { url, handle, inbox_url, shared_inbox_url, display_name }, status: str_to_status(&status_str), } }) @@ -139,11 +129,11 @@ impl FederationRepository for SqliteFederationRepository { async fn update_follower_status( &self, - local_user_id: UserId, + local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowerStatus, ) -> Result<()> { - let uid = local_user_id.value().to_string(); + let uid = local_user_id.to_string(); let status_str = status_to_str(&status); let result = sqlx::query( @@ -156,18 +146,14 @@ impl FederationRepository for SqliteFederationRepository { .await?; if result.rows_affected() == 0 { - tracing::warn!( - local_user_id = %local_user_id.value(), - remote_actor_url = remote_actor_url, - "update_follower_status: no row found" - ); + tracing::warn!(local_user_id = %local_user_id, remote_actor_url, "update_follower_status: no row found"); } Ok(()) } - async fn add_following(&self, local_user_id: UserId, actor: RemoteActor, follow_activity_id: &str) -> Result<()> { - let uid = local_user_id.value().to_string(); + async fn add_following(&self, local_user_id: uuid::Uuid, actor: RemoteActor, follow_activity_id: &str) -> Result<()> { + let uid = local_user_id.to_string(); let now = Utc::now().naive_utc(); let created_at = datetime_to_str(&now); @@ -187,8 +173,8 @@ impl FederationRepository for SqliteFederationRepository { Ok(()) } - async fn get_follow_activity_id(&self, local_user_id: UserId, remote_actor_url: &str) -> Result> { - let uid = local_user_id.value().to_string(); + async fn get_follow_activity_id(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result> { + let uid = local_user_id.to_string(); let row: Option> = sqlx::query_scalar( "SELECT follow_activity_id FROM ap_following WHERE local_user_id = ? AND remote_actor_url = ?", ) @@ -199,22 +185,18 @@ impl FederationRepository for SqliteFederationRepository { Ok(row.flatten()) } - async fn remove_following(&self, local_user_id: UserId, actor_url: &str) -> Result<()> { - let uid = local_user_id.value().to_string(); - - sqlx::query( - "DELETE FROM ap_following WHERE local_user_id = ? AND remote_actor_url = ?", - ) - .bind(&uid) - .bind(actor_url) - .execute(&self.pool) - .await?; - + async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { + let uid = local_user_id.to_string(); + sqlx::query("DELETE FROM ap_following WHERE local_user_id = ? AND remote_actor_url = ?") + .bind(&uid) + .bind(actor_url) + .execute(&self.pool) + .await?; Ok(()) } - async fn get_following(&self, local_user_id: UserId) -> Result> { - let uid = local_user_id.value().to_string(); + async fn get_following(&self, local_user_id: uuid::Uuid) -> Result> { + let uid = local_user_id.to_string(); let rows = sqlx::query( "SELECT a.url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name @@ -226,22 +208,17 @@ impl FederationRepository for SqliteFederationRepository { .fetch_all(&self.pool) .await?; - let actors = rows - .into_iter() - .map(|row| RemoteActor { - url: row.get("url"), - handle: row.get("handle"), - inbox_url: row.get("inbox_url"), - shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), - display_name: row.try_get("display_name").ok().flatten(), - }) - .collect(); - - Ok(actors) + Ok(rows.into_iter().map(|row| RemoteActor { + url: row.get("url"), + handle: row.get("handle"), + inbox_url: row.get("inbox_url"), + shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), + display_name: row.try_get("display_name").ok().flatten(), + }).collect()) } - async fn count_following(&self, local_user_id: UserId) -> Result { - let uid = local_user_id.value().to_string(); + async fn count_following(&self, local_user_id: uuid::Uuid) -> Result { + let uid = local_user_id.to_string(); let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM ap_following WHERE local_user_id = ? AND status = 'accepted'", ) @@ -295,20 +272,17 @@ impl FederationRepository for SqliteFederationRepository { })) } - async fn get_local_actor_keypair(&self, user_id: UserId) -> Result> { - let uid = user_id.value().to_string(); - let row = sqlx::query( - "SELECT public_key, private_key FROM ap_local_actors WHERE user_id = ?", - ) - .bind(&uid) - .fetch_optional(&self.pool) - .await?; - + async fn get_local_actor_keypair(&self, user_id: uuid::Uuid) -> Result> { + let uid = user_id.to_string(); + let row = sqlx::query("SELECT public_key, private_key FROM ap_local_actors WHERE user_id = ?") + .bind(&uid) + .fetch_optional(&self.pool) + .await?; Ok(row.map(|r| (r.get("public_key"), r.get("private_key")))) } - async fn save_local_actor_keypair(&self, user_id: UserId, public_key: String, private_key: String) -> Result<()> { - let uid = user_id.value().to_string(); + async fn save_local_actor_keypair(&self, user_id: uuid::Uuid, public_key: String, private_key: String) -> Result<()> { + let uid = user_id.to_string(); let now = Utc::now().naive_utc(); let created_at = datetime_to_str(&now); @@ -329,7 +303,70 @@ impl FederationRepository for SqliteFederationRepository { Ok(()) } - async fn save_remote_review(&self, review: &Review, ap_id: &str, movie_title: &str, release_year: u16, poster_url: Option<&str>) -> Result<()> { + async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result> { + let uid = local_user_id.to_string(); + + let rows = sqlx::query( + "SELECT f.remote_actor_url, + a.handle, a.inbox_url, a.shared_inbox_url, a.display_name + FROM ap_followers f + LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = ? AND f.status = 'pending'", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await?; + + Ok(rows.into_iter().map(|row| RemoteActor { + url: row.get("remote_actor_url"), + handle: row.try_get("handle").unwrap_or_default(), + inbox_url: row.try_get("inbox_url").unwrap_or_default(), + shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), + display_name: row.try_get("display_name").ok().flatten(), + }).collect()) + } + + async fn update_following_status( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowingStatus, + ) -> Result<()> { + let uid = local_user_id.to_string(); + let status_str = match status { + FollowingStatus::Pending => "pending", + FollowingStatus::Accepted => "accepted", + }; + + let result = sqlx::query( + "UPDATE ap_following SET status = ? WHERE local_user_id = ? AND remote_actor_url = ?", + ) + .bind(status_str) + .bind(&uid) + .bind(remote_actor_url) + .execute(&self.pool) + .await?; + + if result.rows_affected() == 0 { + tracing::warn!(local_user_id = %local_user_id, remote_actor_url, "update_following_status: no row found"); + } + + Ok(()) + } +} + +// --- Content-specific repository (movies-diary) --- + +#[async_trait] +impl RemoteReviewRepository for SqliteFederationRepository { + async fn save_remote_review( + &self, + review: &Review, + ap_id: &str, + movie_title: &str, + release_year: u16, + poster_url: Option<&str>, + ) -> Result<()> { let actor_url = match review.source() { ReviewSource::Remote { actor_url } => actor_url.clone(), ReviewSource::Local => { @@ -339,9 +376,6 @@ impl FederationRepository for SqliteFederationRepository { let movie_id = review.movie_id().value().to_string(); - // Stub movie so the feed INNER JOIN on movies always resolves. - // release_year 0 means unknown — clamp to 1888 (valid ReleaseYear range: 1888-2200). - // ON CONFLICT updates poster_path if a newer review carries one. let _ = sqlx::query( "INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path) VALUES (?, NULL, ?, ?, NULL, ?) @@ -350,7 +384,7 @@ impl FederationRepository for SqliteFederationRepository { ) .bind(&movie_id) .bind(movie_title) - .bind(release_year.max(1888) as i64) // ReleaseYear requires >= 1888 + .bind(release_year.max(1888) as i64) .bind(poster_url) .execute(&self.pool) .await?; @@ -381,7 +415,7 @@ impl FederationRepository for SqliteFederationRepository { Ok(()) } - async fn delete_remote_review_by_ap_id(&self, ap_id: &str, actor_url: &str) -> Result<()> { + async fn delete_remote_review(&self, ap_id: &str, actor_url: &str) -> Result<()> { sqlx::query("DELETE FROM reviews WHERE ap_id = ? AND remote_actor_url = ?") .bind(ap_id) .bind(actor_url) @@ -413,66 +447,11 @@ impl FederationRepository for SqliteFederationRepository { Ok(()) } - async fn delete_remote_reviews_by_actor(&self, actor_url: &str) -> Result<()> { + async fn delete_by_actor(&self, actor_url: &str) -> Result<()> { sqlx::query("DELETE FROM reviews WHERE remote_actor_url = ?") .bind(actor_url) .execute(&self.pool) .await?; Ok(()) } - - async fn update_following_status( - &self, - local_user_id: UserId, - remote_actor_url: &str, - status: FollowingStatus, - ) -> Result<()> { - let uid = local_user_id.value().to_string(); - let status_str = match status { - FollowingStatus::Pending => "pending", - FollowingStatus::Accepted => "accepted", - }; - - let result = sqlx::query( - "UPDATE ap_following SET status = ? WHERE local_user_id = ? AND remote_actor_url = ?", - ) - .bind(status_str) - .bind(&uid) - .bind(remote_actor_url) - .execute(&self.pool) - .await?; - - if result.rows_affected() == 0 { - tracing::warn!( - local_user_id = %local_user_id.value(), - remote_actor_url = remote_actor_url, - "update_following_status: no row found" - ); - } - - Ok(()) - } - - async fn get_pending_followers(&self, local_user_id: UserId) -> Result> { - let uid = local_user_id.value().to_string(); - - let rows = sqlx::query( - "SELECT f.remote_actor_url, - a.handle, a.inbox_url, a.shared_inbox_url, a.display_name - FROM ap_followers f - LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url - WHERE f.local_user_id = ? AND f.status = 'pending'", - ) - .bind(&uid) - .fetch_all(&self.pool) - .await?; - - Ok(rows.into_iter().map(|row| RemoteActor { - url: row.get("remote_actor_url"), - handle: row.try_get("handle").unwrap_or_default(), - inbox_url: row.try_get("inbox_url").unwrap_or_default(), - shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), - display_name: row.try_get("display_name").ok().flatten(), - }).collect()) - } } diff --git a/crates/presentation/src/extractors.rs b/crates/presentation/src/extractors.rs index c7ce0e1..9b82a23 100644 --- a/crates/presentation/src/extractors.rs +++ b/crates/presentation/src/extractors.rs @@ -243,35 +243,34 @@ mod tests { sqlx::query("CREATE TABLE IF NOT EXISTS ap_following (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, PRIMARY KEY (local_user_id, remote_actor_url))") .execute(&pool).await.unwrap(); let fed_repo = Arc::new(sqlite::SqliteFederationRepository::new(pool)); - struct DummyUserRepo; - #[async_trait::async_trait] impl domain::ports::UserRepository for DummyUserRepo { - async fn find_by_email(&self, _: &domain::value_objects::Email) -> Result, domain::errors::DomainError> { Ok(None) } - async fn save(&self, _: &domain::models::User) -> Result<(), domain::errors::DomainError> { Ok(()) } - async fn find_by_id(&self, _: &domain::value_objects::UserId) -> Result, domain::errors::DomainError> { Ok(None) } - async fn find_by_username(&self, _: &domain::value_objects::Username) -> Result, domain::errors::DomainError> { Ok(None) } - async fn list_with_stats(&self) -> Result, domain::errors::DomainError> { Ok(vec![]) } + + struct DummyApUserRepo; + #[async_trait::async_trait] + impl activitypub::ApUserRepository for DummyApUserRepo { + async fn find_by_id(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(None) } + async fn find_by_username(&self, _: &str) -> anyhow::Result> { Ok(None) } } - struct DummyMovieRepo; - #[async_trait::async_trait] impl domain::ports::MovieRepository for DummyMovieRepo { - async fn get_movie_by_external_id(&self, _: &domain::value_objects::ExternalMetadataId) -> Result, domain::errors::DomainError> { Ok(None) } - async fn get_movie_by_id(&self, _: &domain::value_objects::MovieId) -> Result, domain::errors::DomainError> { Ok(None) } - async fn get_movies_by_title_and_year(&self, _: &domain::value_objects::MovieTitle, _: &domain::value_objects::ReleaseYear) -> Result, domain::errors::DomainError> { Ok(vec![]) } - async fn upsert_movie(&self, _: &domain::models::Movie) -> Result<(), domain::errors::DomainError> { Ok(()) } - async fn save_review(&self, _: &domain::models::Review) -> Result { panic!() } - async fn query_diary(&self, _: &domain::models::DiaryFilter) -> Result, domain::errors::DomainError> { panic!() } - async fn get_review_history(&self, _: &domain::value_objects::MovieId) -> Result { panic!() } - async fn get_review_by_id(&self, _: &domain::value_objects::ReviewId) -> Result, domain::errors::DomainError> { Ok(None) } - async fn delete_review(&self, _: &domain::value_objects::ReviewId) -> Result<(), domain::errors::DomainError> { Ok(()) } - async fn delete_movie(&self, _: &domain::value_objects::MovieId) -> Result<(), domain::errors::DomainError> { Ok(()) } - async fn query_activity_feed(&self, _: &domain::models::collections::PageParams) -> Result, domain::errors::DomainError> { panic!() } - async fn get_user_stats(&self, _: &domain::value_objects::UserId) -> Result { panic!() } - async fn get_user_history(&self, _: &domain::value_objects::UserId) -> Result, domain::errors::DomainError> { Ok(vec![]) } - async fn get_user_trends(&self, _: &domain::value_objects::UserId) -> Result { panic!() } + + struct DummyObjectHandler; + #[async_trait::async_trait] + impl activitypub::ApObjectHandler for DummyObjectHandler { + async fn get_local_objects_for_user(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } + async fn on_create(&self, _: &url::Url, _: &url::Url, _: serde_json::Value) -> anyhow::Result<()> { Ok(()) } + async fn on_update(&self, _: &url::Url, _: &url::Url, _: serde_json::Value) -> anyhow::Result<()> { Ok(()) } + async fn on_delete(&self, _: &url::Url, _: &url::Url) -> anyhow::Result<()> { Ok(()) } + async fn on_actor_removed(&self, _: &url::Url) -> anyhow::Result<()> { Ok(()) } } + Arc::new( - activitypub::ActivityPubService::new(fed_repo, Arc::new(DummyUserRepo), Arc::new(DummyMovieRepo), "http://localhost:3000".to_string(), true) - .await - .unwrap(), + activitypub::ActivityPubService::new( + fed_repo, + Arc::new(DummyApUserRepo), + Arc::new(DummyObjectHandler), + "http://localhost:3000".to_string(), + true, + ) + .await + .unwrap(), ) } diff --git a/crates/presentation/src/handlers.rs b/crates/presentation/src/handlers.rs index b78bc8c..846e245 100644 --- a/crates/presentation/src/handlers.rs +++ b/crates/presentation/src/handlers.rs @@ -340,7 +340,7 @@ pub mod html { let following_count = if is_own_profile { if let Some(ref uid) = user_id { - state.ap_service.count_following(uid.clone()).await.unwrap_or(0) + state.ap_service.count_following(uid.value()).await.unwrap_or(0) } else { 0 } @@ -350,7 +350,7 @@ pub mod html { let followers_count = if is_own_profile { state.ap_service - .count_accepted_followers(domain::value_objects::UserId::from_uuid(profile_user_uuid)) + .count_accepted_followers(profile_user_uuid) .await .unwrap_or(0) } else { @@ -359,7 +359,7 @@ pub mod html { let pending_followers = if is_own_profile { state.ap_service - .get_pending_followers(domain::value_objects::UserId::from_uuid(profile_user_uuid)) + .get_pending_followers(profile_user_uuid) .await .unwrap_or_default() .into_iter() @@ -425,7 +425,7 @@ pub mod html { if user_id.value() != profile_user_uuid { return StatusCode::FORBIDDEN.into_response(); } - match state.ap_service.follow(user_id.clone(), &form.handle).await { + match state.ap_service.follow(user_id.value(), &form.handle).await { Ok(()) => Redirect::to(&format!("/users/{}", profile_user_uuid)).into_response(), Err(e) => { tracing::error!("follow error: {:?}", e); @@ -444,7 +444,7 @@ pub mod html { if user_id.value() != profile_user_uuid { return StatusCode::FORBIDDEN.into_response(); } - match state.ap_service.unfollow(user_id.clone(), &form.actor_url).await { + match state.ap_service.unfollow(user_id.value(), &form.actor_url).await { Ok(()) => Redirect::to(&format!("/users/{}/following-list", profile_user_uuid)).into_response(), Err(e) => { let msg = encode_error(&e.to_string()); @@ -462,7 +462,7 @@ pub mod html { if user_id.value() != profile_user_uuid { return StatusCode::FORBIDDEN.into_response(); } - match state.ap_service.accept_follower(user_id, &form.actor_url).await { + match state.ap_service.accept_follower(user_id.value(), &form.actor_url).await { Ok(_) => Redirect::to(&format!("/users/{}", profile_user_uuid)).into_response(), Err(e) => { let msg = encode_error(&e.to_string()); @@ -480,7 +480,7 @@ pub mod html { if user_id.value() != profile_user_uuid { return StatusCode::FORBIDDEN.into_response(); } - match state.ap_service.reject_follower(user_id, &form.actor_url).await { + match state.ap_service.reject_follower(user_id.value(), &form.actor_url).await { Ok(_) => Redirect::to(&format!("/users/{}", profile_user_uuid)).into_response(), Err(e) => { let msg = encode_error(&e.to_string()); @@ -501,7 +501,7 @@ pub mod html { let mut ctx = build_page_context(&state, Some(user_id.clone())).await; ctx.page_title = "Following — Movies Diary".to_string(); ctx.canonical_url = format!("{}/users/{}/following-list", state.app_ctx.config.base_url, profile_user_uuid); - match state.ap_service.get_following(user_id).await { + match state.ap_service.get_following(user_id.value()).await { Ok(following) => { let actors = following.into_iter().map(|a| RemoteActorView { handle: a.handle, @@ -538,7 +538,7 @@ pub mod html { let mut ctx = build_page_context(&state, Some(user_id.clone())).await; ctx.page_title = "Followers — Movies Diary".to_string(); ctx.canonical_url = format!("{}/users/{}/followers-list", state.app_ctx.config.base_url, profile_user_uuid); - match state.ap_service.get_accepted_followers(user_id).await { + match state.ap_service.get_accepted_followers(user_id.value()).await { Ok(followers) => { let actors = followers.into_iter().map(|a| RemoteActorView { handle: a.handle, @@ -572,7 +572,7 @@ pub mod html { if user_id.value() != profile_user_uuid { return StatusCode::FORBIDDEN.into_response(); } - match state.ap_service.remove_follower(user_id, &form.actor_url).await { + match state.ap_service.remove_follower(user_id.value(), &form.actor_url).await { Ok(_) => Redirect::to(&format!("/users/{}/followers-list", profile_user_uuid)).into_response(), Err(e) => { let msg = encode_error(&e.to_string()); diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index e1614a4..488c194 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -15,7 +15,7 @@ use auth::{AuthConfig, Argon2PasswordHasher, JwtAuthService}; use metadata::MetadataClientImpl; use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher}; use poster_storage::{PosterStorageAdapter, StorageConfig}; -use activitypub::ActivityPubService; +use activitypub::{ActivityPubEventHandler, ActivityPubService, DomainUserRepoAdapter, ReviewObjectHandler}; use sqlite::{SqliteFederationRepository, SqliteMovieRepository, SqliteUserRepository}; use rss::RssAdapter; use template_askama::AskamaHtmlRenderer; @@ -94,17 +94,27 @@ async fn wire_dependencies() -> anyhow::Result { // Federation let federation_repo = Arc::new(SqliteFederationRepository::new(pool)); + let user_repo_adapter = Arc::new(DomainUserRepoAdapter(Arc::clone(&user_repository))); + let review_handler = Arc::new(ReviewObjectHandler { + movie_repo: Arc::clone(&repository), + review_store: Arc::clone(&federation_repo) as Arc, + base_url: app_config.base_url.clone(), + }); let ap_service = Arc::new( ActivityPubService::new( federation_repo, - Arc::clone(&user_repository), - Arc::clone(&repository), + user_repo_adapter, + review_handler, app_config.base_url.clone(), cfg!(debug_assertions), ) .await?, ); - let ap_event_handler = ap_service.event_handler(); + let ap_event_handler = ActivityPubEventHandler::new( + Arc::clone(&ap_service), + Arc::clone(&repository), + app_config.base_url.clone(), + ); let poster_handler = PosterSyncHandler::new(handler_ctx, 3); let (event_publisher, event_worker) = create_event_channel(