From 7a43eb4de69196bf72fa6c50e076d6554eef97f3 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sat, 9 May 2026 17:27:57 +0200 Subject: [PATCH] activitypub: remove files moved to activitypub-base --- crates/adapters/activitypub/src/activities.rs | 361 ------------- .../adapters/activitypub/src/actor_handler.rs | 24 - crates/adapters/activitypub/src/actors.rs | 235 --------- crates/adapters/activitypub/src/data.rs | 38 -- crates/adapters/activitypub/src/error.rs | 49 -- crates/adapters/activitypub/src/federation.rs | 52 -- .../activitypub/src/followers_handler.rs | 80 --- crates/adapters/activitypub/src/inbox.rs | 20 - crates/adapters/activitypub/src/outbox.rs | 56 -- crates/adapters/activitypub/src/repository.rs | 76 --- crates/adapters/activitypub/src/service.rs | 485 ------------------ crates/adapters/activitypub/src/webfinger.rs | 46 -- 12 files changed, 1522 deletions(-) delete mode 100644 crates/adapters/activitypub/src/activities.rs delete mode 100644 crates/adapters/activitypub/src/actor_handler.rs delete mode 100644 crates/adapters/activitypub/src/actors.rs delete mode 100644 crates/adapters/activitypub/src/data.rs delete mode 100644 crates/adapters/activitypub/src/error.rs delete mode 100644 crates/adapters/activitypub/src/federation.rs delete mode 100644 crates/adapters/activitypub/src/followers_handler.rs delete mode 100644 crates/adapters/activitypub/src/inbox.rs delete mode 100644 crates/adapters/activitypub/src/outbox.rs delete mode 100644 crates/adapters/activitypub/src/repository.rs delete mode 100644 crates/adapters/activitypub/src/service.rs delete mode 100644 crates/adapters/activitypub/src/webfinger.rs diff --git a/crates/adapters/activitypub/src/activities.rs b/crates/adapters/activitypub/src/activities.rs deleted file mode 100644 index 63fa748..0000000 --- a/crates/adapters/activitypub/src/activities.rs +++ /dev/null @@ -1,361 +0,0 @@ -use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - kinds::activity::{AcceptType, CreateType, DeleteType, FollowType, RejectType, UndoType, UpdateType}, - traits::{Activity, Object}, -}; -use serde::{Deserialize, Serialize}; -use url::Url; - -use crate::actors::DbActor; -use crate::data::FederationData; -use crate::error::Error; -use crate::objects::{DbReview, ReviewObject}; -use crate::repository::{FollowerStatus, FollowingStatus}; - -// --- Follow --- - -#[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct FollowActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: FollowType, - pub(crate) actor: ObjectId, - pub(crate) object: ObjectId, -} - -#[async_trait::async_trait] -impl Activity for FollowActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, data: &Data) -> Result<(), Self::Error> { - let target_url = self.object.inner(); - // Url::domain() strips the port, so build host:port explicitly - let target_domain = match (target_url.host_str(), target_url.port()) { - (Some(host), Some(port)) => format!("{}:{}", host, port), - (Some(host), None) => host.to_string(), - _ => return Err(Error::bad_request(anyhow::anyhow!("invalid follow target URL"))), - }; - if target_domain != data.domain { - return Err(Error::bad_request(anyhow::anyhow!("follow target is not a local actor"))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let _follower = self.actor.dereference(data).await?; - let local_actor = self.object.dereference(data).await?; - - data.federation_repo - .add_follower( - local_actor.user_id.clone(), - self.actor.inner().as_str(), - FollowerStatus::Pending, - self.id.as_str(), - ) - .await?; - - tracing::info!( - follower = %self.actor.inner(), - local_user = %local_actor.user_id.value(), - "follow request pending approval" - ); - Ok(()) - } -} - -// --- Accept --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct AcceptActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: AcceptType, - pub(crate) actor: ObjectId, - pub(crate) object: FollowActivity, -} - -#[async_trait::async_trait] -impl Activity for AcceptActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let local_user_id = crate::urls::extract_user_id_from_url(self.object.actor.inner()) - .ok_or_else(|| Error::bad_request(anyhow::anyhow!("invalid actor URL in Follow")))?; - let remote_actor_url = self.actor.inner().as_str(); - data.federation_repo - .update_following_status(local_user_id, remote_actor_url, FollowingStatus::Accepted) - .await?; - tracing::info!( - remote_actor = %self.actor.inner(), - "follow accepted by remote" - ); - Ok(()) - } -} - -// --- Reject --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct RejectActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: RejectType, - pub(crate) actor: ObjectId, - pub(crate) object: FollowActivity, -} - -#[async_trait::async_trait] -impl Activity for RejectActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - // The actor rejected our follow. Extract the local user from the original Follow's actor. - let local_user_url = self.object.actor.inner(); - if let Some(user_id) = crate::urls::extract_user_id_from_url(local_user_url) { - data.federation_repo - .remove_following(user_id, self.actor.inner().as_str()) - .await?; - } - tracing::info!(actor = %self.actor.inner(), "follow rejected"); - Ok(()) - } -} - -// --- Undo --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct UndoActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: UndoType, - pub(crate) actor: ObjectId, - pub(crate) object: FollowActivity, -} - -#[async_trait::async_trait] -impl Activity for UndoActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - // Remote actor is unfollowing a local user - let local_user_url = self.object.object.inner(); - if let Some(user_id) = crate::urls::extract_user_id_from_url(local_user_url) { - data.federation_repo - .remove_follower(user_id, self.actor.inner().as_str()) - .await?; - } - tracing::info!(actor = %self.actor.inner(), "unfollowed"); - Ok(()) - } -} - -// --- Create --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct CreateActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: CreateType, - pub(crate) actor: ObjectId, - pub(crate) object: ReviewObject, -} - -#[async_trait::async_trait] -impl Activity for CreateActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - if self.object.attributed_to.inner() != self.actor.inner() { - return Err(Error::bad_request(anyhow::anyhow!( - "activity actor does not match object attributed_to" - ))); - } - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - DbReview::from_json(self.object, data).await?; - tracing::info!(actor = %self.actor.inner(), "received review"); - Ok(()) - } -} - -// --- Delete --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct DeleteActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: DeleteType, - pub(crate) actor: ObjectId, - pub(crate) object: Url, -} - -#[async_trait::async_trait] -impl Activity for DeleteActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - data.federation_repo - .delete_remote_review_by_ap_id( - self.object.as_str(), - self.actor.inner().as_str(), - ) - .await?; - tracing::info!(object = %self.object, "remote review deleted"); - Ok(()) - } -} - -// --- Update --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct UpdateActivity { - pub(crate) id: Url, - #[serde(rename = "type", default)] - pub(crate) kind: UpdateType, - pub(crate) actor: ObjectId, - pub(crate) object: serde_json::Value, -} - -#[async_trait::async_trait] -impl Activity for UpdateActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { - Ok(()) - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let object: ReviewObject = match serde_json::from_value(self.object) { - Ok(o) => o, - Err(_) => { - tracing::debug!(actor = %self.actor.inner(), "ignoring non-review Update activity"); - return Ok(()); - } - }; - if object.attributed_to.inner() != self.actor.inner() { - return Err(Error::bad_request(anyhow::anyhow!( - "update actor does not match object attributed_to" - ))); - } - let ap_id = object.id.inner().as_str(); - let rating = object.rating.min(5); - let comment = object.comment.as_deref(); - let watched_at = object.watched_at.naive_utc(); - data.federation_repo - .update_remote_review(ap_id, self.actor.inner().as_str(), rating, comment, watched_at) - .await?; - tracing::info!(actor = %self.actor.inner(), ap_id = %ap_id, "remote review updated"); - Ok(()) - } -} - -// --- Inbox dispatch enum --- - -#[derive(Debug, Deserialize, Serialize)] -#[serde(tag = "type")] -#[enum_delegate::implement(Activity)] -pub enum InboxActivities { - #[serde(rename = "Follow")] - Follow(FollowActivity), - #[serde(rename = "Accept")] - Accept(AcceptActivity), - #[serde(rename = "Reject")] - Reject(RejectActivity), - #[serde(rename = "Undo")] - Undo(UndoActivity), - #[serde(rename = "Create")] - Create(CreateActivity), - #[serde(rename = "Delete")] - Delete(DeleteActivity), - #[serde(rename = "Update")] - Update(UpdateActivity), -} diff --git a/crates/adapters/activitypub/src/actor_handler.rs b/crates/adapters/activitypub/src/actor_handler.rs deleted file mode 100644 index adcf3e8..0000000 --- a/crates/adapters/activitypub/src/actor_handler.rs +++ /dev/null @@ -1,24 +0,0 @@ -use activitypub_federation::{ - axum::json::FederationJson, config::Data, protocol::context::WithContext, traits::Object, -}; -use axum::extract::Path; - -use domain::value_objects::UserId; - -use crate::actors::{get_local_actor, Person}; -use crate::data::FederationData; -use crate::error::Error; - -pub async fn actor_handler( - Path(user_id_str): Path, - data: Data, -) -> Result>, Error> { - let uuid = uuid::Uuid::parse_str(&user_id_str) - .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?; - let user_id = UserId::from_uuid(uuid); - - let db_actor = get_local_actor(user_id, &data).await?; - let person = db_actor.into_json(&data).await?; - - Ok(FederationJson(WithContext::new_default(person))) -} diff --git a/crates/adapters/activitypub/src/actors.rs b/crates/adapters/activitypub/src/actors.rs deleted file mode 100644 index 3cc2f7a..0000000 --- a/crates/adapters/activitypub/src/actors.rs +++ /dev/null @@ -1,235 +0,0 @@ -use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - http_signatures::generate_actor_keypair, - kinds::actor::PersonType, - protocol::{public_key::PublicKey, verification::verify_domains_match}, - traits::{Actor, Object}, -}; -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use url::Url; - -use domain::value_objects::UserId; - -use crate::data::FederationData; -use crate::error::Error; -use crate::repository::RemoteActor; - -#[derive(Debug, Clone)] -pub struct DbActor { - pub user_id: UserId, - pub username: String, - pub public_key_pem: String, - pub private_key_pem: Option, - pub inbox_url: Url, - pub outbox_url: Url, - pub followers_url: Url, - pub following_url: Url, - pub ap_id: Url, - pub last_refreshed_at: DateTime, -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct Person { - #[serde(rename = "type")] - kind: PersonType, - id: ObjectId, - preferred_username: String, - inbox: Url, - outbox: Url, - followers: Url, - following: Url, - public_key: PublicKey, - name: Option, -} - -pub async fn get_local_actor( - user_id: UserId, - data: &Data, -) -> Result { - let user = data - .user_repo - .find_by_id(&user_id) - .await - .map_err(Error::from)? - .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found: {}", user_id.value())))?; - - let (public_key, private_key) = match data - .federation_repo - .get_local_actor_keypair(user_id.clone()) - .await? - { - Some(kp) => kp, - None => { - let kp = generate_actor_keypair()?; - data.federation_repo - .save_local_actor_keypair( - user_id.clone(), - kp.public_key.clone(), - kp.private_key.clone(), - ) - .await?; - (kp.public_key, kp.private_key) - } - }; - - let ap_id = crate::urls::actor_url(&data.base_url, user.id()); - let inbox_url = Url::parse(&format!("{}/inbox", &ap_id)).expect("valid inbox url"); - let outbox_url = Url::parse(&format!("{}/outbox", &ap_id)).expect("valid outbox url"); - let followers_url = Url::parse(&format!("{}/followers", &ap_id)).expect("valid followers url"); - let following_url = Url::parse(&format!("{}/following", &ap_id)).expect("valid following url"); - - Ok(DbActor { - user_id: user.id().clone(), - username: user.username().value().to_string(), - public_key_pem: public_key, - private_key_pem: Some(private_key), - inbox_url, - outbox_url, - followers_url, - following_url, - ap_id, - last_refreshed_at: Utc::now(), - }) -} - -#[async_trait::async_trait] -impl Object for DbActor { - type DataType = FederationData; - type Kind = Person; - type Error = Error; - - fn id(&self) -> &Url { - &self.ap_id - } - - fn last_refreshed_at(&self) -> Option> { - Some(self.last_refreshed_at) - } - - async fn read_from_id( - object_id: Url, - data: &Data, - ) -> Result, Self::Error> { - // Extract user_id from URL path: /users/{uuid} - let user_id = match crate::urls::extract_user_id_from_url(&object_id) { - Some(id) => id, - None => return Ok(None), - }; - let user = match data.user_repo.find_by_id(&user_id).await { - Ok(Some(u)) => u, - _ => return Ok(None), - }; - - let keypair = data - .federation_repo - .get_local_actor_keypair(user_id.clone()) - .await?; - - let (public_key, private_key) = match keypair { - Some(kp) => (kp.0, Some(kp.1)), - None => return Ok(None), - }; - - let ap_id = crate::urls::actor_url(&data.base_url, user.id()); - let inbox_url = Url::parse(&format!("{}/inbox", &ap_id)).expect("valid url"); - let outbox_url = Url::parse(&format!("{}/outbox", &ap_id)).expect("valid url"); - let followers_url = Url::parse(&format!("{}/followers", &ap_id)).expect("valid url"); - let following_url = Url::parse(&format!("{}/following", &ap_id)).expect("valid url"); - - Ok(Some(DbActor { - user_id: user.id().clone(), - username: user.username().value().to_string(), - public_key_pem: public_key, - private_key_pem: private_key, - inbox_url, - outbox_url, - followers_url, - following_url, - ap_id, - last_refreshed_at: Utc::now(), - })) - } - - async fn into_json(self, _data: &Data) -> Result { - let public_key = PublicKey { - id: format!("{}#main-key", &self.ap_id), - owner: self.ap_id.clone(), - public_key_pem: self.public_key_pem.clone(), - }; - - Ok(Person { - kind: Default::default(), - id: self.ap_id.clone().into(), - preferred_username: self.username.clone(), - inbox: self.inbox_url.clone(), - outbox: self.outbox_url.clone(), - followers: self.followers_url.clone(), - following: self.following_url.clone(), - public_key, - name: Some(self.username.clone()), - }) - } - - async fn verify( - json: &Self::Kind, - expected_domain: &Url, - _data: &Data, - ) -> Result<(), Self::Error> { - verify_domains_match(json.id.inner(), expected_domain)?; - Ok(()) - } - - async fn from_json( - json: Self::Kind, - data: &Data, - ) -> Result { - let actor = RemoteActor { - url: json.id.inner().to_string(), - handle: json.preferred_username.clone(), - inbox_url: json.inbox.to_string(), - shared_inbox_url: None, - display_name: json.name.clone(), - }; - data.federation_repo.upsert_remote_actor(actor).await?; - - // Deterministic UUID from actor URL so the same remote actor always maps to the same UserId - let url_str = json.id.inner().to_string(); - let stable_id = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, url_str.as_bytes()); - let user_id = UserId::from_uuid(stable_id); - let ap_id = json.id.inner().clone(); - let inbox_url = json.inbox.clone(); - let outbox_url = json.outbox.clone(); - let followers_url = json.followers.clone(); - let following_url = json.following.clone(); - - Ok(DbActor { - user_id, - username: json.preferred_username.clone(), - public_key_pem: json.public_key.public_key_pem, - private_key_pem: None, - inbox_url, - outbox_url, - followers_url, - following_url, - ap_id, - last_refreshed_at: Utc::now(), - }) - } -} - -impl Actor for DbActor { - fn public_key_pem(&self) -> &str { - &self.public_key_pem - } - - fn private_key_pem(&self) -> Option { - self.private_key_pem.clone() - } - - fn inbox(&self) -> Url { - self.inbox_url.clone() - } -} diff --git a/crates/adapters/activitypub/src/data.rs b/crates/adapters/activitypub/src/data.rs deleted file mode 100644 index 6499291..0000000 --- a/crates/adapters/activitypub/src/data.rs +++ /dev/null @@ -1,38 +0,0 @@ -use std::sync::Arc; - -use domain::ports::{MovieRepository, UserRepository}; - -use crate::repository::FederationRepository; - -#[derive(Clone)] -pub struct FederationData { - pub(crate) federation_repo: Arc, - pub(crate) user_repo: Arc, - pub(crate) movie_repo: Arc, - pub(crate) base_url: String, - pub(crate) domain: String, -} - -impl FederationData { - pub fn new( - federation_repo: Arc, - user_repo: Arc, - movie_repo: Arc, - base_url: String, - ) -> Self { - let domain = base_url - .trim_start_matches("https://") - .trim_start_matches("http://") - .split('/') - .next() - .unwrap_or("") - .to_string(); - Self { - federation_repo, - user_repo, - movie_repo, - base_url, - domain, - } - } -} diff --git a/crates/adapters/activitypub/src/error.rs b/crates/adapters/activitypub/src/error.rs deleted file mode 100644 index bd60e2d..0000000 --- a/crates/adapters/activitypub/src/error.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::fmt::{Display, Formatter}; - -use axum::http::StatusCode; - -#[derive(Debug)] -pub struct Error(pub(crate) anyhow::Error, pub(crate) StatusCode); - -impl Error { - pub fn not_found(e: impl Into) -> Self { - Self(e.into(), StatusCode::NOT_FOUND) - } - - pub fn bad_request(e: impl Into) -> Self { - Self(e.into(), StatusCode::BAD_REQUEST) - } - -} - -impl Display for Error { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self.0, f) - } -} - -impl From for Error -where - T: Into, -{ - fn from(t: T) -> Self { - Error(t.into(), StatusCode::INTERNAL_SERVER_ERROR) - } -} - -impl axum::response::IntoResponse for Error { - fn into_response(self) -> axum::response::Response { - let status = self.1; - if status.is_server_error() { - tracing::error!(error = %self.0, status = status.as_u16(), "federation error"); - } else { - tracing::debug!(error = %self.0, status = status.as_u16(), "federation response"); - } - let body = if status.is_server_error() { - "internal server error".to_string() - } else { - self.0.to_string() - }; - (status, body).into_response() - } -} diff --git a/crates/adapters/activitypub/src/federation.rs b/crates/adapters/activitypub/src/federation.rs deleted file mode 100644 index 1aa185a..0000000 --- a/crates/adapters/activitypub/src/federation.rs +++ /dev/null @@ -1,52 +0,0 @@ -use activitypub_federation::config::{Data, FederationConfig, FederationMiddleware, UrlVerifier}; -use activitypub_federation::error::Error as FedError; -use url::Url; - -use crate::data::FederationData; - -// In debug mode, allow all URLs (including http://localhost:3000 where the -// port colon would otherwise fail the default domain character check). -#[derive(Clone)] -struct PermissiveVerifier; - -#[async_trait::async_trait] -impl UrlVerifier for PermissiveVerifier { - async fn verify(&self, _url: &Url) -> Result<(), FedError> { - Ok(()) - } -} - -#[derive(Clone)] -pub struct ApFederationConfig(pub FederationConfig); - -impl ApFederationConfig { - pub async fn new(data: FederationData, debug: bool) -> anyhow::Result { - let config = if debug { - FederationConfig::builder() - .domain(&data.domain) - .app_data(data) - .debug(true) - .http_signature_compat(true) - .url_verifier(Box::new(PermissiveVerifier)) - .build() - .await? - } else { - FederationConfig::builder() - .domain(&data.domain) - .app_data(data) - .debug(false) - .http_signature_compat(true) - .build() - .await? - }; - Ok(Self(config)) - } - - pub fn to_request_data(&self) -> Data { - self.0.to_request_data() - } - - pub fn middleware(&self) -> FederationMiddleware { - FederationMiddleware::new(self.0.clone()) - } -} diff --git a/crates/adapters/activitypub/src/followers_handler.rs b/crates/adapters/activitypub/src/followers_handler.rs deleted file mode 100644 index 955bd82..0000000 --- a/crates/adapters/activitypub/src/followers_handler.rs +++ /dev/null @@ -1,80 +0,0 @@ -use activitypub_federation::{axum::json::FederationJson, config::Data}; -use axum::extract::Path; -use serde_json::json; - -use domain::value_objects::UserId; - -use crate::data::FederationData; -use crate::error::Error; -use crate::repository::FollowerStatus; - -fn ordered_collection(id: String, total: usize, items: Vec) -> serde_json::Value { - json!({ - "@context": "https://www.w3.org/ns/activitystreams", - "type": "OrderedCollection", - "id": id, - "totalItems": total, - "orderedItems": items, - }) -} - -pub async fn followers_handler( - Path(user_id_str): Path, - data: Data, -) -> Result, Error> { - let user_id = UserId::from_uuid( - uuid::Uuid::parse_str(&user_id_str) - .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?, - ); - - data.user_repo - .find_by_id(&user_id) - .await - .map_err(Error::from)? - .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; - - let followers = data - .federation_repo - .get_followers(user_id) - .await - .map_err(Error::from)?; - - let items: Vec = followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .map(|f| f.actor.url) - .collect(); - - let id = format!("{}/users/{}/followers", data.base_url, user_id_str); - Ok(FederationJson(ordered_collection(id, items.len(), items))) -} - -pub async fn following_handler( - Path(user_id_str): Path, - data: Data, -) -> Result, Error> { - let user_id = UserId::from_uuid( - uuid::Uuid::parse_str(&user_id_str) - .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?, - ); - - data.user_repo - .find_by_id(&user_id) - .await - .map_err(Error::from)? - .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; - - let following = data - .federation_repo - .get_following(user_id) - .await - .map_err(Error::from)?; - - let items: Vec = following - .into_iter() - .map(|a| a.url) - .collect(); - - let id = format!("{}/users/{}/following", data.base_url, user_id_str); - Ok(FederationJson(ordered_collection(id, items.len(), items))) -} diff --git a/crates/adapters/activitypub/src/inbox.rs b/crates/adapters/activitypub/src/inbox.rs deleted file mode 100644 index acee339..0000000 --- a/crates/adapters/activitypub/src/inbox.rs +++ /dev/null @@ -1,20 +0,0 @@ -use activitypub_federation::{ - axum::inbox::{receive_activity, ActivityData}, - config::Data, - protocol::context::WithContext, -}; - -use crate::activities::InboxActivities; -use crate::actors::DbActor; -use crate::data::FederationData; -use crate::error::Error; - -pub async fn inbox_handler( - data: Data, - activity_data: ActivityData, -) -> Result<(), Error> { - receive_activity::, DbActor, FederationData>( - activity_data, &data, - ) - .await -} diff --git a/crates/adapters/activitypub/src/outbox.rs b/crates/adapters/activitypub/src/outbox.rs deleted file mode 100644 index 4dc47ca..0000000 --- a/crates/adapters/activitypub/src/outbox.rs +++ /dev/null @@ -1,56 +0,0 @@ -use activitypub_federation::{axum::json::FederationJson, config::Data}; -use axum::extract::Path; -use serde::{Deserialize, Serialize}; - -use domain::{models::ReviewSource, value_objects::UserId}; - -use crate::data::FederationData; -use crate::error::Error; - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct OrderedCollection { - #[serde(rename = "@context")] - context: String, - #[serde(rename = "type")] - kind: String, - id: String, - total_items: u64, - ordered_items: Vec, -} - -pub async fn outbox_handler( - Path(user_id_str): Path, - data: Data, -) -> Result, Error> { - let uuid = uuid::Uuid::parse_str(&user_id_str) - .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?; - let user_id = UserId::from_uuid(uuid); - - data.user_repo - .find_by_id(&user_id) - .await - .map_err(Error::from)? - .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; - - let history = data - .movie_repo - .get_user_history(&user_id) - .await - .map_err(Error::from)?; - - let local_count = history - .iter() - .filter(|e| matches!(e.review().source(), ReviewSource::Local)) - .count(); - - let outbox_url = format!("{}/users/{}/outbox", data.base_url, user_id_str); - - Ok(FederationJson(OrderedCollection { - context: "https://www.w3.org/ns/activitystreams".to_string(), - kind: "OrderedCollection".to_string(), - id: outbox_url, - total_items: local_count as u64, - ordered_items: vec![], - })) -} diff --git a/crates/adapters/activitypub/src/repository.rs b/crates/adapters/activitypub/src/repository.rs deleted file mode 100644 index 22b91f0..0000000 --- a/crates/adapters/activitypub/src/repository.rs +++ /dev/null @@ -1,76 +0,0 @@ -use anyhow::Result; -use async_trait::async_trait; -use chrono::NaiveDateTime; -use domain::models::Review; -use domain::value_objects::UserId; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum FollowerStatus { - Pending, - Accepted, - Rejected, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum FollowingStatus { - Pending, - Accepted, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct RemoteActor { - pub url: String, - pub handle: String, - pub inbox_url: String, - pub shared_inbox_url: Option, - pub display_name: Option, -} - -#[derive(Debug, Clone)] -pub struct Follower { - pub actor: RemoteActor, - pub status: FollowerStatus, -} - -#[async_trait] -pub trait FederationRepository: Send + Sync { - async fn add_follower(&self, local_user_id: UserId, remote_actor_url: &str, status: FollowerStatus, follow_activity_id: &str) -> Result<()>; - async fn get_follower_follow_activity_id(&self, local_user_id: UserId, remote_actor_url: &str) -> Result>; - async fn remove_follower(&self, local_user_id: UserId, remote_actor_url: &str) -> Result<()>; - async fn get_followers(&self, local_user_id: UserId) -> Result>; - async fn update_follower_status(&self, local_user_id: UserId, remote_actor_url: &str, status: FollowerStatus) -> Result<()>; - async fn add_following(&self, local_user_id: UserId, actor: RemoteActor, follow_activity_id: &str) -> Result<()>; - async fn get_follow_activity_id(&self, local_user_id: UserId, remote_actor_url: &str) -> Result>; - async fn remove_following(&self, local_user_id: UserId, actor_url: &str) -> Result<()>; - async fn get_following(&self, local_user_id: UserId) -> Result>; - async fn count_following(&self, local_user_id: UserId) -> Result; - async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()>; - async fn get_remote_actor(&self, actor_url: &str) -> Result>; - async fn save_remote_review( - &self, - review: &Review, - ap_id: &str, - movie_title: &str, - release_year: u16, - poster_url: Option<&str>, - ) -> Result<()>; - async fn delete_remote_review_by_ap_id(&self, ap_id: &str, actor_url: &str) -> Result<()>; - async fn update_remote_review( - &self, - ap_id: &str, - actor_url: &str, - rating: u8, - comment: Option<&str>, - watched_at: NaiveDateTime, - ) -> Result<()>; - async fn get_local_actor_keypair(&self, user_id: UserId) -> Result>; - async fn save_local_actor_keypair(&self, user_id: UserId, public_key: String, private_key: String) -> Result<()>; - async fn delete_remote_reviews_by_actor(&self, actor_url: &str) -> Result<()>; - async fn get_pending_followers(&self, local_user_id: UserId) -> Result>; - async fn update_following_status( - &self, - local_user_id: UserId, - remote_actor_url: &str, - status: FollowingStatus, - ) -> Result<()>; -} diff --git a/crates/adapters/activitypub/src/service.rs b/crates/adapters/activitypub/src/service.rs deleted file mode 100644 index fe64c8d..0000000 --- a/crates/adapters/activitypub/src/service.rs +++ /dev/null @@ -1,485 +0,0 @@ -use std::sync::Arc; - -use activitypub_federation::{ - activity_sending::SendActivityTask, - config::Data, - fetch::{object_id::ObjectId, webfinger::webfinger_resolve_actor}, - protocol::context::WithContext, - traits::Actor, -}; -use axum::{routing::get, routing::post, Router}; -use domain::{ports::UserRepository, value_objects::UserId}; -use url::Url; - -use crate::{ - activities::{AcceptActivity, CreateActivity, FollowActivity, RejectActivity, UndoActivity}, - actor_handler::actor_handler, - actors::{get_local_actor, DbActor}, - data::FederationData, - event_handler::ActivityPubEventHandler, - federation::ApFederationConfig, - followers_handler::{followers_handler, following_handler}, - inbox::inbox_handler, - outbox::outbox_handler, - repository::{FederationRepository, FollowerStatus, RemoteActor}, - webfinger::webfinger_handler, -}; - -pub(crate) async fn send_with_retry( - sends: Vec, - data: &Data, -) -> Vec { - let mut failures = vec![]; - for send in sends { - let mut delay = std::time::Duration::from_secs(1); - for attempt in 1..=3u32 { - match send.clone().sign_and_send(data).await { - Ok(()) => { - break; - } - Err(e) if attempt < 3 => { - tracing::warn!(attempt, error = %e, "delivery failed, retrying"); - tokio::time::sleep(delay).await; - delay *= 2; - } - Err(e) => { - tracing::error!(attempt, error = %e, "delivery failed permanently"); - failures.push(anyhow::anyhow!(e)); - } - } - } - } - failures -} - -pub struct ActivityPubService { - federation_config: ApFederationConfig, - base_url: String, -} - -impl ActivityPubService { - pub async fn new( - repo: Arc, - user_repo: Arc, - movie_repo: Arc, - base_url: String, - debug: bool, - ) -> anyhow::Result { - let data = FederationData::new(repo, user_repo, movie_repo, base_url.clone()); - let federation_config = ApFederationConfig::new(data, debug).await?; - Ok(Self { - federation_config, - base_url, - }) - } - - pub fn federation_config(&self) -> &ApFederationConfig { - &self.federation_config - } - - pub fn request_data(&self) -> Data { - self.federation_config.to_request_data() - } - - // Returns the AP actor document JSON for a local user. - // Used for content negotiation in the HTML profile handler. - pub async fn actor_json(&self, user_id_str: &str) -> anyhow::Result { - use activitypub_federation::traits::Object; - use crate::actors::get_local_actor; - let uuid = uuid::Uuid::parse_str(user_id_str)?; - let user_id = UserId::from_uuid(uuid); - let data = self.federation_config.to_request_data(); - let actor = get_local_actor(user_id, &data).await - .map_err(|e| anyhow::anyhow!("{e}"))?; - let person = actor.into_json(&data).await - .map_err(|e| anyhow::anyhow!("{e}"))?; - let with_context = WithContext::new_default(person); - Ok(serde_json::to_string(&with_context)?) - } - - pub fn router(&self) -> Router { - Router::new() - .route("/.well-known/webfinger", get(webfinger_handler)) - .route("/users/{user_id}", get(actor_handler)) - .route("/users/{user_id}/inbox", post(inbox_handler)) - .route("/users/{user_id}/outbox", get(outbox_handler)) - .route("/users/{user_id}/followers", get(followers_handler)) - .route("/users/{user_id}/following", get(following_handler)) - .layer(self.federation_config.middleware()) - } - - pub fn event_handler(&self) -> ActivityPubEventHandler { - ActivityPubEventHandler::new(self.federation_config.clone(), self.base_url.clone()) - } - - pub async fn follow(&self, local_user_id: UserId, handle: &str) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - - let remote_actor: DbActor = webfinger_resolve_actor(handle, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let local_actor = get_local_actor(local_user_id.clone(), &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let follow_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let follow_id_str = follow_id.to_string(); - let follow = FollowActivity { - id: follow_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: ObjectId::from(remote_actor.ap_id.clone()), - }; - let follow_with_ctx = WithContext::new_default(follow); - - let sends = SendActivityTask::prepare( - &follow_with_ctx, - &local_actor, - vec![remote_actor.inbox()], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some activity deliveries failed permanently"); - } - - let remote = RemoteActor { - url: remote_actor.ap_id.to_string(), - handle: remote_actor.username.clone(), - inbox_url: remote_actor.inbox_url.to_string(), - shared_inbox_url: None, - display_name: Some(remote_actor.username.clone()), - }; - data.federation_repo - .add_following(local_user_id, remote, &follow_id_str) - .await?; - - Ok(()) - } - - pub async fn unfollow(&self, local_user_id: UserId, actor_url_str: &str) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - - let remote = data - .federation_repo - .get_remote_actor(actor_url_str) - .await? - .ok_or_else(|| anyhow::anyhow!("remote actor not found: {}", actor_url_str))?; - - let local_actor = get_local_actor(local_user_id.clone(), &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let remote_ap_id = Url::parse(actor_url_str)?; - let inbox = Url::parse(&remote.inbox_url)?; - - let follow_activity_id_str = data - .federation_repo - .get_follow_activity_id(local_user_id.clone(), actor_url_str) - .await?; - let follow_id = match follow_activity_id_str { - Some(id) => Url::parse(&id)?, - None => crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, - }; - let follow = FollowActivity { - id: follow_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: ObjectId::from(remote_ap_id), - }; - - let undo_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let undo = UndoActivity { - id: undo_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: follow, - }; - let undo_with_ctx = WithContext::new_default(undo); - - let sends = - SendActivityTask::prepare(&undo_with_ctx, &local_actor, vec![inbox], &data).await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some activity deliveries failed permanently"); - } - - data.federation_repo - .remove_following(local_user_id, actor_url_str) - .await?; - - data.federation_repo - .delete_remote_reviews_by_actor(actor_url_str) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - Ok(()) - } - - pub async fn get_following(&self, local_user_id: UserId) -> anyhow::Result> { - let data = self.federation_config.to_request_data(); - data.federation_repo.get_following(local_user_id).await - } - - pub async fn count_following(&self, local_user_id: UserId) -> anyhow::Result { - let data = self.federation_config.to_request_data(); - data.federation_repo.count_following(local_user_id).await - } - - pub async fn accept_follower( - &self, - local_user_id: UserId, - remote_actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id.clone(), &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let remote_actor = data - .federation_repo - .get_remote_actor(remote_actor_url) - .await? - .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; - - let follow_id_str = data - .federation_repo - .get_follower_follow_activity_id(local_user_id.clone(), remote_actor_url) - .await? - .ok_or_else(|| anyhow::anyhow!("follow activity id not found for {}", remote_actor_url))?; - let follow_id = Url::parse(&follow_id_str)?; - let follow = FollowActivity { - id: follow_id, - kind: Default::default(), - actor: ObjectId::from(Url::parse(remote_actor_url)?), - object: ObjectId::from(local_actor.ap_id.clone()), - }; - let accept = AcceptActivity { - id: crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: follow, - }; - - // Update status first so local state is correct even if delivery fails - data.federation_repo - .update_follower_status(local_user_id.clone(), remote_actor_url, FollowerStatus::Accepted) - .await?; - - let inbox = Url::parse(&remote_actor.inbox_url)?; - let sends = SendActivityTask::prepare( - &WithContext::new_default(accept), - &local_actor, - vec![inbox.clone()], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!("failed to deliver Accept activity, but follower is marked accepted locally"); - } - - self.spawn_backfill(local_user_id, remote_actor.inbox_url.clone()); - - Ok(()) - } - - pub async fn reject_follower( - &self, - local_user_id: UserId, - remote_actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id.clone(), &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let remote_actor = data - .federation_repo - .get_remote_actor(remote_actor_url) - .await? - .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; - - let follow_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let follow = FollowActivity { - id: follow_id, - kind: Default::default(), - actor: ObjectId::from(Url::parse(remote_actor_url)?), - object: ObjectId::from(local_actor.ap_id.clone()), - }; - let reject = RejectActivity { - id: crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: follow, - }; - - let inbox = Url::parse(&remote_actor.inbox_url)?; - let sends = SendActivityTask::prepare( - &WithContext::new_default(reject), - &local_actor, - vec![inbox], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some activity deliveries failed permanently"); - } - - data.federation_repo - .remove_follower(local_user_id, remote_actor_url) - .await?; - - Ok(()) - } - - pub async fn get_pending_followers( - &self, - local_user_id: UserId, - ) -> anyhow::Result> { - let data = self.federation_config.to_request_data(); - data.federation_repo - .get_pending_followers(local_user_id) - .await - } - - pub async fn get_accepted_followers( - &self, - local_user_id: UserId, - ) -> anyhow::Result> { - let data = self.federation_config.to_request_data(); - let followers = data.federation_repo.get_followers(local_user_id).await?; - Ok(followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .map(|f| f.actor) - .collect()) - } - - pub async fn count_accepted_followers(&self, local_user_id: UserId) -> anyhow::Result { - let data = self.federation_config.to_request_data(); - let followers = data.federation_repo.get_followers(local_user_id).await?; - Ok(followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .count()) - } - - pub async fn remove_follower( - &self, - local_user_id: UserId, - actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - data.federation_repo.remove_follower(local_user_id, actor_url).await - } - - fn spawn_backfill(&self, owner_user_id: UserId, follower_inbox_url: String) { - let config = self.federation_config.clone(); - let base_url = self.base_url.clone(); - - tokio::spawn(async move { - if let Err(e) = ActivityPubService::run_backfill( - config, base_url, owner_user_id, follower_inbox_url, - ).await { - tracing::warn!(error = %e, "backfill: task failed"); - } - }); - } - - async fn run_backfill( - config: ApFederationConfig, - base_url: String, - owner_user_id: UserId, - follower_inbox_url: String, - ) -> anyhow::Result<()> { - const BATCH_SIZE: usize = 20; - - let data = config.to_request_data(); - let local_actor = get_local_actor(owner_user_id.clone(), &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - let inbox = Url::parse(&follower_inbox_url)?; - - let history = data.movie_repo.get_user_history(&owner_user_id).await?; - let mut local_reviews: Vec<_> = history - .into_iter() - .filter(|e| matches!(e.review().source(), domain::models::ReviewSource::Local)) - .collect(); - local_reviews.reverse(); // oldest first so Mastodon feed is chronological - - let total = local_reviews.len(); - - let mut success_count = 0usize; - let mut failure_count = 0usize; - - for chunk in local_reviews.chunks(BATCH_SIZE) { - for entry in chunk { - match ActivityPubService::deliver_review_to_inbox( - entry.review().clone(), - &local_actor, - inbox.clone(), - &data, - &base_url, - ) - .await - { - Ok(_) => success_count += 1, - Err(_) => failure_count += 1, - } - } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - - tracing::info!( - user_id = %owner_user_id.value(), - follower = %follower_inbox_url, - sent = success_count, - failed = failure_count, - total = total, - "backfill complete" - ); - Ok(()) - } - - async fn deliver_review_to_inbox( - review: domain::models::Review, - local_actor: &DbActor, - inbox: Url, - data: &Data, - base_url: &str, - ) -> anyhow::Result<()> { - use activitypub_federation::traits::Object; - use crate::objects::DbReview; - - let review_id = review.id().clone(); - let ap_id = crate::urls::review_url(base_url, &review_id); - let db_review = DbReview { review, ap_id }; - let object = db_review.into_json(data).await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let activity_id = crate::urls::create_activity_url(base_url, &review_id) - .map_err(|e| anyhow::anyhow!("{e}"))?; - let create = CreateActivity { - id: activity_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object, - }; - - let sends = SendActivityTask::prepare( - &WithContext::new_default(create), - local_actor, - vec![inbox], - data, - ).await?; - let failures = send_with_retry(sends, data).await; - if let Some(e) = failures.into_iter().next() { - return Err(e); - } - Ok(()) - } -} diff --git a/crates/adapters/activitypub/src/webfinger.rs b/crates/adapters/activitypub/src/webfinger.rs deleted file mode 100644 index 2c013c7..0000000 --- a/crates/adapters/activitypub/src/webfinger.rs +++ /dev/null @@ -1,46 +0,0 @@ -use activitypub_federation::{ - config::Data, - fetch::webfinger::{build_webfinger_response, extract_webfinger_name, Webfinger}, -}; -use axum::{ - extract::Query, - http::header, - response::{IntoResponse, Response}, -}; -use serde::Deserialize; - -use domain::value_objects::Username; - -use crate::data::FederationData; -use crate::error::Error; - -#[derive(Deserialize)] -pub struct WebfingerQuery { - resource: String, -} - -pub async fn webfinger_handler( - Query(query): Query, - data: Data, -) -> Result { - let name = extract_webfinger_name(&query.resource, &data)?; - - let username = Username::new(name.to_string()) - .map_err(|e| Error::bad_request(anyhow::anyhow!(e.to_string())))?; - let user = data - .user_repo - .find_by_username(&username) - .await - .map_err(Error::from)? - .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; - - let ap_id = crate::urls::actor_url(&data.base_url, user.id()); - - let wf: Webfinger = build_webfinger_response(query.resource, ap_id); - let body = serde_json::to_string(&wf) - .map_err(|e| Error::from(anyhow::anyhow!(e)))?; - Ok(( - [(header::CONTENT_TYPE, "application/jrd+json")], - body, - ).into_response()) -}