diff --git a/src/service.rs b/src/service.rs deleted file mode 100644 index 91a6aff..0000000 --- a/src/service.rs +++ /dev/null @@ -1,1636 +0,0 @@ -use std::fmt::Debug; -use std::sync::Arc; - -use activitypub_federation::{ - activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext, - traits::{Activity, Actor}, -}; -use axum::{Router, extract::DefaultBodyLimit, routing::get, routing::post}; -use serde::Serialize; -use url::Url; - -use crate::{ - activities::{ - AcceptActivity, CreateActivity, FollowActivity, RejectActivity, UndoActivity, - UpdateActivity, - }, - actor_handler::actor_handler, - actors::{DbActor, get_local_actor}, - content::ApObjectHandler, - data::{FederationData, FederationEvent}, - error::Error, - federation::ApFederationConfig, - followers_handler::{followers_handler, following_handler}, - inbox::inbox_handler, - nodeinfo::{nodeinfo_handler, nodeinfo_well_known_handler}, - outbox::outbox_handler, - repository::{ - BlockedDomain, FederationRepository, FollowerStatus, FollowingStatus, RemoteActor, - }, - urls::activity_url, - user::ApUserRepository, - webfinger::webfinger_handler, -}; - -/// Maximum retries for immediate in-process delivery attempts. -pub const DELIVERY_MAX_ATTEMPTS: u32 = 3; -/// Initial backoff before first retry (doubles each attempt). -pub const DELIVERY_INITIAL_DELAY_SECS: u64 = 1; -/// HTTP request timeout when fetching remote AP resources. -pub const HTTP_FETCH_TIMEOUT_SECS: u64 = 30; -/// Sleep between backfill batches to avoid overwhelming remote servers. -pub const BATCH_FETCH_SLEEP_MS: u64 = 100; - -#[allow(dead_code)] -fn content_to_html(text: &str) -> String { - let escaped = text - .replace('&', "&") - .replace('<', "<") - .replace('>', ">") - .replace('"', """); - let paragraphs: Vec<&str> = escaped.split('\n').filter(|s| !s.is_empty()).collect(); - if paragraphs.is_empty() { - format!("

{}

", escaped) - } else { - paragraphs - .iter() - .map(|p| format!("

{}

", p)) - .collect::>() - .join("") - } -} - -pub(crate) async fn send_with_retry( - sends: Vec, - data: &activitypub_federation::config::Data, - max_attempts: u32, - initial_delay_secs: u64, -) -> Vec { - let mut failures = vec![]; - for send in sends { - let mut delay = std::time::Duration::from_secs(initial_delay_secs); - for attempt in 1..=max_attempts { - match send.clone().sign_and_send(data).await { - Ok(()) => break, - Err(e) if attempt < max_attempts => { - tracing::warn!(attempt, error = %e, "delivery failed, retrying"); - tokio::time::sleep(delay).await; - delay *= 2; - } - Err(e) => { - tracing::error!(attempt, error = %e, "delivery failed permanently"); - failures.push(anyhow::anyhow!(e)); - } - } - } - } - failures -} - -/// Wraps a pre-serialized AP activity JSON for re-signing via `SendActivityTask::prepare`. -/// Used by `deliver_to_inbox` when a consumer re-presents a persisted queue item. -#[derive(Debug)] -struct RawActivity { - id: Url, - actor_url: Url, - value: serde_json::Value, -} - -impl Serialize for RawActivity { - fn serialize(&self, s: S) -> Result { - self.value.serialize(s) - } -} - -#[async_trait::async_trait] -impl Activity for RawActivity { - type DataType = FederationData; - type Error = Error; - - fn id(&self) -> &Url { - &self.id - } - fn actor(&self) -> &Url { - &self.actor_url - } - async fn verify( - &self, - _data: &activitypub_federation::config::Data, - ) -> Result<(), Self::Error> { - Ok(()) - } - async fn receive( - self, - _data: &activitypub_federation::config::Data, - ) -> Result<(), Self::Error> { - Ok(()) - } -} - -#[derive(Clone)] -pub struct ActivityPubService { - federation_config: ApFederationConfig, - base_url: String, - delivery_max_attempts: u32, - delivery_initial_delay_secs: u64, -} - -pub struct ActivityPubServiceBuilder { - repo: Arc, - user_repo: Arc, - object_handler: Arc, - base_url: String, - allow_registration: bool, - software_name: String, - debug: bool, - event_publisher: Option>, - delivery_max_attempts: u32, - delivery_initial_delay_secs: u64, -} - -impl ActivityPubServiceBuilder { - pub fn allow_registration(mut self, v: bool) -> Self { - self.allow_registration = v; - self - } - pub fn software_name(mut self, v: impl Into) -> Self { - self.software_name = v.into(); - self - } - pub fn debug(mut self, v: bool) -> Self { - self.debug = v; - self - } - pub fn event_publisher(mut self, v: Arc) -> Self { - self.event_publisher = Some(v); - self - } - /// Max delivery retries per inbox per attempt (default: 3). - pub fn delivery_max_attempts(mut self, v: u32) -> Self { - self.delivery_max_attempts = v; - self - } - /// Initial retry backoff in seconds, doubles each attempt (default: 1). - pub fn delivery_initial_delay_secs(mut self, v: u64) -> Self { - self.delivery_initial_delay_secs = v; - self - } - pub async fn build(self) -> anyhow::Result { - let data = FederationData::new( - self.repo, - self.user_repo, - self.object_handler, - self.base_url.clone(), - self.allow_registration, - self.software_name, - self.event_publisher, - ); - let federation_config = ApFederationConfig::new(data, self.debug).await?; - Ok(ActivityPubService { - federation_config, - base_url: self.base_url, - delivery_max_attempts: self.delivery_max_attempts, - delivery_initial_delay_secs: self.delivery_initial_delay_secs, - }) - } -} - -impl ActivityPubService { - pub fn builder( - repo: Arc, - user_repo: Arc, - object_handler: Arc, - base_url: impl Into, - ) -> ActivityPubServiceBuilder { - ActivityPubServiceBuilder { - repo, - user_repo, - object_handler, - base_url: base_url.into(), - allow_registration: false, - software_name: String::new(), - debug: false, - event_publisher: None, - delivery_max_attempts: DELIVERY_MAX_ATTEMPTS, - delivery_initial_delay_secs: DELIVERY_INITIAL_DELAY_SECS, - } - } - - pub fn federation_config(&self) -> &ApFederationConfig { - &self.federation_config - } - - pub fn request_data(&self) -> activitypub_federation::config::Data { - self.federation_config.to_request_data() - } - - pub fn base_url(&self) -> &str { - &self.base_url - } - - /// Route outbound deliveries: publish [`FederationEvent::DeliveryRequested`] when an - /// [`crate::data::EventPublisher`] is configured, otherwise spawn a fire-and-forget task. - /// - /// `sends` — pre-prepared `SendActivityTask` objects (used in the spawn path). - /// `activity_json` — serialized activity (used in the EventPublisher path). - /// `inboxes` — target inbox URLs (used in the EventPublisher path). - /// - /// Both `sends` and `inboxes` are prepared by the caller from the same activity so there - /// is no double-serialisation overhead on either path. - async fn dispatch_deliveries( - &self, - data: &activitypub_federation::config::Data, - local_actor: &DbActor, - inboxes: Vec, - sends: Vec, - activity_json: serde_json::Value, - ) -> anyhow::Result<()> { - if let Some(publisher) = data.event_publisher.as_ref() { - for inbox in inboxes { - let event = FederationEvent::DeliveryRequested { - inbox, - activity: activity_json.clone(), - signing_actor_id: local_actor.user_id, - }; - if let Err(e) = publisher.publish(event).await { - tracing::warn!(error = %e, "failed to enqueue DeliveryRequested event"); - } - } - } else { - let data = data.clone(); - let max_attempts = self.delivery_max_attempts; - let initial_delay = self.delivery_initial_delay_secs; - tokio::spawn(async move { - let failures = - send_with_retry(sends, &data, max_attempts, initial_delay).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some deliveries failed permanently"); - } - }); - } - Ok(()) - } - - /// Deliver a single outbound activity to `inbox`. Call this from a job-queue consumer - /// that received a [`FederationEvent::DeliveryRequested`] event. - /// - /// `activity` must be a fully-serialized AP activity (with `@context`). On permanent - /// failure a [`FederationEvent::DeliveryFailed`] event is published if an - /// [`crate::data::EventPublisher`] is configured. - pub async fn deliver_to_inbox( - &self, - inbox: url::Url, - activity: serde_json::Value, - signing_actor_id: uuid::Uuid, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let actor = get_local_actor(signing_actor_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let id = activity - .get("id") - .and_then(|v| v.as_str()) - .and_then(|s| Url::parse(s).ok()) - .unwrap_or_else(|| actor.ap_id.clone()); - let actor_url = activity - .get("actor") - .and_then(|v| v.as_str()) - .and_then(|s| Url::parse(s).ok()) - .unwrap_or_else(|| actor.ap_id.clone()); - - let raw = RawActivity { - id, - actor_url, - value: activity.clone(), - }; - let sends = - SendActivityTask::prepare(&raw, &actor, vec![inbox.clone()], &data).await?; - let failures = send_with_retry( - sends, - &data, - self.delivery_max_attempts, - self.delivery_initial_delay_secs, - ) - .await; - if failures.is_empty() { - return Ok(()); - } - let error_msg = failures - .iter() - .map(|e| e.to_string()) - .collect::>() - .join("; "); - if let Some(publisher) = data.event_publisher.as_ref() { - let _ = publisher - .publish(FederationEvent::DeliveryFailed { - inbox, - activity, - signing_actor_id, - error: error_msg.clone(), - }) - .await; - } - Err(anyhow::anyhow!("delivery failed: {}", error_msg)) - } - - /// Returns `(local_actor, deduplicated_inboxes)` for accepted followers via - /// the `get_accepted_follower_inboxes` repo method (DB-side filtering). - async fn accepted_follower_inboxes( - &self, - data: &activitypub_federation::config::Data, - local_user_id: uuid::Uuid, - ) -> anyhow::Result)>> { - let local_actor = get_local_actor(local_user_id, data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let inbox_strs = data - .federation_repo - .get_accepted_follower_inboxes(local_user_id) - .await?; - - if inbox_strs.is_empty() { - return Ok(None); - } - - let inboxes: Vec = inbox_strs - .into_iter() - .filter_map(|s| { - Url::parse(&s) - .map_err(|e| tracing::warn!(inbox = %s, error = %e, "skipping unparseable inbox URL")) - .ok() - }) - .collect(); - - if inboxes.is_empty() { - return Ok(None); - } - - Ok(Some((local_actor, inboxes))) - } - - /// Helper: serialize `activity` to JSON and prepare `SendActivityTask` objects. - /// Returns (activity_json, sends, inboxes_clone) so both dispatch paths have what they need. - async fn prepare_broadcast( - &self, - data: &activitypub_federation::config::Data, - local_actor: &DbActor, - inboxes: Vec, - activity: A, - ) -> anyhow::Result<(serde_json::Value, Vec, Vec)> - where - A: Activity + Serialize + Debug + Send + Sync, - { - let with_ctx = WithContext::new_default(activity); - // Borrow for JSON (does not move with_ctx). - let activity_json = serde_json::to_value(&with_ctx)?; - // Borrow for prepare (does not move with_ctx). - let sends = - SendActivityTask::prepare(&with_ctx, local_actor, inboxes.clone(), data).await?; - Ok((activity_json, sends, inboxes)) - } - - pub async fn followers_collection_json( - &self, - user_id: uuid::Uuid, - page: Option, - ) -> anyhow::Result { - const AP_CONTEXT: &str = "https://www.w3.org/ns/activitystreams"; - const PAGE_SIZE: usize = 20; - let data = self.federation_config.to_request_data(); - let collection_id = format!("{}/users/{}/followers", self.base_url, user_id); - let total = data.federation_repo.count_followers(user_id).await?; - let obj = if let Some(p) = page { - let p = p.max(1); - let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE; - let followers = data - .federation_repo - .get_followers_page(user_id, offset as u32, PAGE_SIZE) - .await?; - let has_next = offset + followers.len() < total; - let items: Vec = followers.into_iter().map(|f| f.actor.url).collect(); - let mut obj = serde_json::json!({ - "@context": AP_CONTEXT, - "type": "OrderedCollectionPage", - "id": format!("{}?page={}", collection_id, p), - "partOf": collection_id, - "totalItems": total, - "orderedItems": items, - }); - if has_next { - obj["next"] = serde_json::json!(format!("{}?page={}", collection_id, p + 1)); - } - obj - } else { - serde_json::json!({ - "@context": AP_CONTEXT, - "type": "OrderedCollection", - "id": collection_id, - "totalItems": total, - "first": format!("{}?page=1", collection_id), - }) - }; - Ok(serde_json::to_string(&obj)?) - } - - pub async fn following_collection_json( - &self, - user_id: uuid::Uuid, - page: Option, - ) -> anyhow::Result { - const AP_CONTEXT: &str = "https://www.w3.org/ns/activitystreams"; - const PAGE_SIZE: usize = 20; - let data = self.federation_config.to_request_data(); - let collection_id = format!("{}/users/{}/following", self.base_url, user_id); - let total = data.federation_repo.count_following(user_id).await?; - let obj = if let Some(p) = page { - let p = p.max(1); - let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE; - let following = data - .federation_repo - .get_following_page(user_id, offset as u32, PAGE_SIZE) - .await?; - let has_next = offset + following.len() < total; - let items: Vec = following.into_iter().map(|a| a.url).collect(); - let mut obj = serde_json::json!({ - "@context": AP_CONTEXT, - "type": "OrderedCollectionPage", - "id": format!("{}?page={}", collection_id, p), - "partOf": collection_id, - "totalItems": total, - "orderedItems": items, - }); - if has_next { - obj["next"] = serde_json::json!(format!("{}?page={}", collection_id, p + 1)); - } - obj - } else { - serde_json::json!({ - "@context": AP_CONTEXT, - "type": "OrderedCollection", - "id": collection_id, - "totalItems": total, - "first": format!("{}?page=1", collection_id), - }) - }; - Ok(serde_json::to_string(&obj)?) - } - - pub async fn actor_json(&self, user_id_str: &str) -> anyhow::Result { - use activitypub_federation::traits::Object; - let uuid = uuid::Uuid::parse_str(user_id_str)?; - let data = self.federation_config.to_request_data(); - let actor = get_local_actor(uuid, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - let person = actor - .into_json(&data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - Ok(serde_json::to_string(&WithContext::new_default(person))?) - } - - pub async fn mark_follower_accepted( - &self, - user_id: uuid::Uuid, - actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - data.federation_repo - .update_follower_status(user_id, actor_url, crate::repository::FollowerStatus::Accepted) - .await - .map_err(|e| anyhow::anyhow!("{e}")) - } - - pub async fn mark_follower_rejected( - &self, - user_id: uuid::Uuid, - actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - data.federation_repo - .remove_follower(user_id, actor_url) - .await - .map_err(|e| anyhow::anyhow!("{e}")) - } - - pub async fn lookup_actor_by_handle( - &self, - handle: &str, - ) -> anyhow::Result { - tracing::info!(handle, "looking up remote actor"); - let data = self.federation_config.to_request_data(); - let actor = Self::webfinger_https(handle, &data) - .await - .inspect_err(|e| tracing::warn!(handle, error = %e, "actor lookup failed"))?; - let domain = actor.ap_id.host_str().unwrap_or("").to_string(); - let handle = format!("{}@{}", actor.username, domain); - tracing::info!(handle, ap_url = %actor.ap_id, "remote actor resolved"); - Ok(crate::user::LookedUpActor { - handle, - display_name: actor.display_name, - bio: actor.bio, - avatar_url: actor.avatar_url, - banner_url: actor.banner_url, - ap_url: actor.ap_id, - outbox_url: Some(actor.outbox_url), - followers_url: Some(actor.followers_url), - following_url: Some(actor.following_url), - also_known_as: actor.also_known_as, - profile_url: actor.profile_url, - attachment: actor.attachment, - }) - } - - /// Returns the ActivityPub router. Inbox routes enforce a 1 MB body limit. - pub fn router(&self) -> Router - where - S: Clone + Send + Sync + 'static, - { - Router::new() - .route("/.well-known/nodeinfo", get(nodeinfo_well_known_handler)) - .route("/nodeinfo/2.0", get(nodeinfo_handler)) - .route("/.well-known/webfinger", get(webfinger_handler)) - .route( - "/inbox", - post(inbox_handler).layer(DefaultBodyLimit::max(1024 * 1024)), - ) - .route("/users/{id}", get(actor_handler)) - .route( - "/users/{id}/inbox", - post(inbox_handler).layer(DefaultBodyLimit::max(1024 * 1024)), - ) - .route("/users/{id}/outbox", get(outbox_handler)) - .route("/users/{id}/followers", get(followers_handler)) - .route("/users/{id}/following", get(following_handler)) - .layer(self.federation_config.middleware()) - } - - pub async fn broadcast_announce_to_followers( - &self, - local_user_id: uuid::Uuid, - object_ap_id: url::Url, - ) -> anyhow::Result<()> { - let announce_id = url::Url::parse(&format!( - "{}/activities/announce/{}", - self.base_url, - uuid::Uuid::new_v5( - &uuid::Uuid::NAMESPACE_URL, - format!("{}/{}", local_user_id, object_ap_id).as_bytes(), - ) - )) - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, local_user_id).await? - else { - return Ok(()); - }; - - let announce = crate::activities::AnnounceActivity { - id: announce_id, - kind: Default::default(), - actor: activitypub_federation::fetch::object_id::ObjectId::from( - local_actor.ap_id.clone(), - ), - object: object_ap_id, - published: Some(chrono::Utc::now()), - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![local_actor.followers_url.to_string()], - }; - let (json, sends, inboxes) = - self.prepare_broadcast(&data, &local_actor, inboxes, announce).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await - } - - pub async fn broadcast_undo_announce_to_followers( - &self, - local_user_id: uuid::Uuid, - object_ap_id: url::Url, - ) -> anyhow::Result<()> { - let announce_id = url::Url::parse(&format!( - "{}/activities/announce/{}", - self.base_url, - uuid::Uuid::new_v5( - &uuid::Uuid::NAMESPACE_URL, - format!("{}/{}", local_user_id, object_ap_id).as_bytes(), - ) - )) - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let undo_id = - crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, local_user_id).await? - else { - return Ok(()); - }; - - let undo = crate::activities::UndoActivity { - id: undo_id, - kind: Default::default(), - actor: activitypub_federation::fetch::object_id::ObjectId::from( - local_actor.ap_id.clone(), - ), - object: serde_json::json!({ - "type": "Announce", - "id": announce_id.to_string(), - "actor": local_actor.ap_id.to_string(), - "object": object_ap_id.to_string(), - }), - }; - let (json, sends, inboxes) = - self.prepare_broadcast(&data, &local_actor, inboxes, undo).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await - } - - pub async fn broadcast_like_to_inbox( - &self, - liker_user_id: uuid::Uuid, - object_ap_id: url::Url, - author_inbox_url: url::Url, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(liker_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let like_id = url::Url::parse(&format!( - "{}/activities/like/{}", - self.base_url, - uuid::Uuid::new_v5( - &uuid::Uuid::NAMESPACE_URL, - format!("{}/{}", liker_user_id, object_ap_id).as_bytes(), - ) - ))?; - - let like = crate::activities::LikeActivity { - id: like_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: object_ap_id, - }; - let (json, sends, inboxes) = self - .prepare_broadcast(&data, &local_actor, vec![author_inbox_url], like) - .await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await - } - - pub async fn broadcast_undo_like_to_inbox( - &self, - liker_user_id: uuid::Uuid, - object_ap_id: url::Url, - author_inbox_url: url::Url, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(liker_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let like_id = url::Url::parse(&format!( - "{}/activities/like/{}", - self.base_url, - uuid::Uuid::new_v5( - &uuid::Uuid::NAMESPACE_URL, - format!("{}/{}", liker_user_id, object_ap_id).as_bytes(), - ) - ))?; - - let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - - let undo = crate::activities::UndoActivity { - id: undo_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: serde_json::json!({ - "type": "Like", - "id": like_id.to_string(), - "actor": local_actor.ap_id.to_string(), - "object": object_ap_id.to_string(), - }), - }; - let (json, sends, inboxes) = self - .prepare_broadcast(&data, &local_actor, vec![author_inbox_url], undo) - .await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await - } - - async fn webfinger_https( - handle: &str, - data: &activitypub_federation::config::Data, - ) -> anyhow::Result { - let normalized = handle.trim_start_matches('@'); - let at = normalized - .rfind('@') - .ok_or_else(|| anyhow::anyhow!("handle must be user@domain"))?; - let (user, domain_str) = (&normalized[..at], &normalized[at + 1..]); - let wf_url = format!( - "https://{}/.well-known/webfinger?resource=acct:{}@{}", - domain_str, user, domain_str - ); - tracing::debug!(handle, wf_url, "resolving webfinger"); - let wf: serde_json::Value = reqwest::Client::new() - .get(&wf_url) - .header("Accept", "application/jrd+json, application/json") - .send() - .await? - .json() - .await?; - let self_href = wf["links"] - .as_array() - .and_then(|links| { - links.iter().find(|l| { - l["rel"].as_str() == Some("self") - && l["type"].as_str() == Some("application/activity+json") - }) - }) - .and_then(|l| l["href"].as_str()) - .ok_or_else(|| anyhow::anyhow!("no self link in WebFinger response"))? - .to_owned(); - tracing::debug!(handle, self_href, "webfinger resolved, fetching actor"); - let self_url = url::Url::parse(&self_href)?; - let actor: DbActor = ObjectId::from(self_url) - .dereference(data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - Ok(actor) - } - - pub async fn follow(&self, local_user_id: uuid::Uuid, handle: &str) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - - let normalized = handle.trim_start_matches('@'); - let parts: Vec<&str> = normalized.splitn(2, '@').collect(); - if parts.len() == 2 && parts[1] == data.domain { - return self.follow_local(local_user_id, parts[0], &data).await; - } - - let remote_actor: DbActor = Self::webfinger_https(handle, &data).await?; - - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let follow_id_str = follow_id.to_string(); - - let domain = remote_actor.ap_id.host_str().unwrap_or(""); - let full_handle = format!("{}@{}", remote_actor.username, domain); - let remote = RemoteActor { - url: remote_actor.ap_id.to_string(), - handle: full_handle, - inbox_url: remote_actor.inbox_url.to_string(), - shared_inbox_url: remote_actor.shared_inbox_url.as_ref().map(|u| u.to_string()), - display_name: Some(remote_actor.username.clone()), - avatar_url: remote_actor.avatar_url.as_ref().map(|u| u.to_string()), - outbox_url: Some(remote_actor.outbox_url.to_string()), - }; - - // Save BEFORE delivering Follow — prevents lost state if process restarts - // between delivery and the DB write. - data.federation_repo - .add_following(local_user_id, remote, &follow_id_str) - .await?; - - let follow = FollowActivity { - id: Url::parse(&follow_id_str)?, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: ObjectId::from(remote_actor.ap_id.clone()), - }; - let (json, sends, inboxes) = self - .prepare_broadcast(&data, &local_actor, vec![remote_actor.inbox()], follow) - .await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await - } - - pub async fn unfollow( - &self, - local_user_id: uuid::Uuid, - actor_url_str: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - - if actor_url_str.starts_with(&self.base_url) { - return self.unfollow_local(local_user_id, actor_url_str, &data).await; - } - - let remote = data - .federation_repo - .get_remote_actor(actor_url_str) - .await? - .ok_or_else(|| anyhow::anyhow!("remote actor not found: {}", actor_url_str))?; - - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let remote_ap_id = Url::parse(actor_url_str)?; - let inbox = Url::parse(&remote.inbox_url)?; - - let follow_activity_id_str = data - .federation_repo - .get_follow_activity_id(local_user_id, actor_url_str) - .await?; - let follow_id = match follow_activity_id_str { - Some(id) => Url::parse(&id)?, - None => activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, - }; - let follow = FollowActivity { - id: follow_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: ObjectId::from(remote_ap_id), - }; - - let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let undo = UndoActivity { - id: undo_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: serde_json::to_value(&follow).map_err(|e| anyhow::anyhow!("{e}"))?, - }; - - let (json, sends, inboxes) = - self.prepare_broadcast(&data, &local_actor, vec![inbox], undo).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await?; - - data.federation_repo - .remove_following(local_user_id, actor_url_str) - .await?; - - data.object_handler - .on_actor_removed(&Url::parse(actor_url_str)?) - .await?; - - Ok(()) - } - - pub async fn accept_follower( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let remote_actor = data - .federation_repo - .get_remote_actor(remote_actor_url) - .await? - .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; - - let follow_id_str = data - .federation_repo - .get_follower_follow_activity_id(local_user_id, remote_actor_url) - .await? - .ok_or_else(|| { - anyhow::anyhow!("follow activity id not found for {}", remote_actor_url) - })?; - let follow_id = Url::parse(&follow_id_str)?; - let follow = FollowActivity { - id: follow_id, - kind: Default::default(), - actor: ObjectId::from(Url::parse(remote_actor_url)?), - object: ObjectId::from(local_actor.ap_id.clone()), - }; - let accept = AcceptActivity { - id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: follow, - }; - - // Mark accepted BEFORE delivering Accept. Local state is authoritative; - // if delivery fails, the consumer's job queue retries it. - data.federation_repo - .update_follower_status(local_user_id, remote_actor_url, FollowerStatus::Accepted) - .await?; - - let inbox = Url::parse(&remote_actor.inbox_url)?; - let (json, sends, inboxes) = - self.prepare_broadcast(&data, &local_actor, vec![inbox], accept).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await?; - - let target_inbox = remote_actor - .shared_inbox_url - .clone() - .unwrap_or_else(|| remote_actor.inbox_url.clone()); - self.spawn_backfill(local_user_id, target_inbox); - - Ok(()) - } - - pub async fn reject_follower( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let remote_actor = data - .federation_repo - .get_remote_actor(remote_actor_url) - .await? - .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; - - let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let follow = FollowActivity { - id: follow_id, - kind: Default::default(), - actor: ObjectId::from(Url::parse(remote_actor_url)?), - object: ObjectId::from(local_actor.ap_id.clone()), - }; - let reject = RejectActivity { - id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: follow, - }; - - let inbox = Url::parse(&remote_actor.inbox_url)?; - let (json, sends, inboxes) = - self.prepare_broadcast(&data, &local_actor, vec![inbox], reject).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await?; - - data.federation_repo - .remove_follower(local_user_id, remote_actor_url) - .await?; - - Ok(()) - } - - pub async fn get_pending_followers( - &self, - local_user_id: uuid::Uuid, - ) -> anyhow::Result> { - let data = self.federation_config.to_request_data(); - data.federation_repo - .get_pending_followers(local_user_id) - .await - } - - pub async fn get_accepted_followers( - &self, - local_user_id: uuid::Uuid, - ) -> anyhow::Result> { - let data = self.federation_config.to_request_data(); - let followers = data.federation_repo.get_followers(local_user_id).await?; - Ok(followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .map(|f| f.actor) - .collect()) - } - - pub async fn count_accepted_followers( - &self, - local_user_id: uuid::Uuid, - ) -> anyhow::Result { - let data = self.federation_config.to_request_data(); - let followers = data.federation_repo.get_followers(local_user_id).await?; - Ok(followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .count()) - } - - pub async fn get_following( - &self, - local_user_id: uuid::Uuid, - ) -> anyhow::Result> { - let data = self.federation_config.to_request_data(); - data.federation_repo.get_following(local_user_id).await - } - - pub async fn count_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result { - let data = self.federation_config.to_request_data(); - data.federation_repo.count_following(local_user_id).await - } - - pub async fn remove_follower( - &self, - local_user_id: uuid::Uuid, - actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - data.federation_repo - .remove_follower(local_user_id, actor_url) - .await - } - - pub async fn broadcast_delete_to_followers( - &self, - local_user_id: uuid::Uuid, - ap_id: Url, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, local_user_id).await? - else { - return Ok(()); - }; - - let delete_id = - crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let delete = crate::activities::DeleteActivity { - id: delete_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: serde_json::json!(ap_id.to_string()), - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![local_actor.followers_url.to_string()], - }; - let (json, sends, inboxes) = - self.prepare_broadcast(&data, &local_actor, inboxes, delete).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await - } - - pub async fn broadcast_add_to_followers( - &self, - local_user_id: uuid::Uuid, - ap_id: Url, - object: serde_json::Value, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, local_user_id).await? - else { - return Ok(()); - }; - - let add = crate::activities::AddActivity { - id: ap_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object, - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![local_actor.followers_url.to_string()], - }; - let (json, sends, inboxes) = - self.prepare_broadcast(&data, &local_actor, inboxes, add).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await - } - - pub async fn broadcast_undo_add_to_followers( - &self, - local_user_id: uuid::Uuid, - watchlist_entry_ap_id: Url, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, local_user_id).await? - else { - return Ok(()); - }; - - let undo_id = - crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let undo = crate::activities::UndoActivity { - id: undo_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: serde_json::json!({ - "type": "Add", - "id": watchlist_entry_ap_id.as_str(), - "object": { "id": watchlist_entry_ap_id.as_str() } - }), - }; - let (json, sends, inboxes) = - self.prepare_broadcast(&data, &local_actor, inboxes, undo).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await - } - - pub async fn broadcast_create_note( - &self, - local_user_id: uuid::Uuid, - note: serde_json::Value, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, local_user_id).await? - else { - return Ok(()); - }; - - let note_id_str = note["id"].as_str().unwrap_or(""); - let create_id = Url::parse(&format!( - "{}/activities/create/{}", - self.base_url, - uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, note_id_str.as_bytes()) - )) - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let create = crate::activities::CreateActivity { - id: create_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: note, - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![local_actor.followers_url.to_string()], - bto: vec![], - bcc: vec![], - }; - let (json, sends, inboxes) = - self.prepare_broadcast(&data, &local_actor, inboxes, create).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await - } - - pub async fn broadcast_update_note( - &self, - local_user_id: uuid::Uuid, - note: serde_json::Value, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let Some((local_actor, inboxes)) = - self.accepted_follower_inboxes(&data, local_user_id).await? - else { - return Ok(()); - }; - - let update_id = - crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - - let update = crate::activities::UpdateActivity { - id: update_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: note, - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![local_actor.followers_url.to_string()], - }; - let (json, sends, inboxes) = - self.prepare_broadcast(&data, &local_actor, inboxes, update).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await - } - - pub async fn broadcast_actor_update(&self, user_id: uuid::Uuid) -> anyhow::Result<()> { - use activitypub_federation::traits::Object; - - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let person = local_actor - .clone() - .into_json(&data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - let person_json = serde_json::to_value(WithContext::new_default(person))?; - - let update_id = Url::parse(&format!( - "{}/activities/update/{}", - self.base_url, - uuid::Uuid::new_v4() - ))?; - - let update = UpdateActivity { - id: update_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: person_json, - to: vec![crate::urls::AS_PUBLIC.to_string()], - cc: vec![local_actor.followers_url.to_string()], - }; - - let Some((_, inboxes)) = self.accepted_follower_inboxes(&data, user_id).await? else { - tracing::info!(%user_id, "no accepted followers, skipping actor update broadcast"); - return Ok(()); - }; - - tracing::info!(%user_id, inbox_count = inboxes.len(), "broadcasting actor update"); - let (json, sends, inboxes) = - self.prepare_broadcast(&data, &local_actor, inboxes, update).await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await - } - - pub async fn broadcast_move( - &self, - user_id: uuid::Uuid, - new_actor_url: url::Url, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let Some((_, inboxes)) = self.accepted_follower_inboxes(&data, user_id).await? else { - tracing::info!(%user_id, "broadcast_move: no accepted followers, nothing to send"); - return Ok(()); - }; - - let move_id = - crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - - let move_activity = crate::activities::MoveActivity { - id: move_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: local_actor.ap_id.clone(), - target: new_actor_url.clone(), - }; - - let (json, sends, inboxes) = self - .prepare_broadcast(&data, &local_actor, inboxes, move_activity) - .await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await?; - - tracing::info!( - %user_id, - target = %new_actor_url, - "broadcast_move: dispatched to all accepted followers" - ); - Ok(()) - } - - pub async fn block_actor( - &self, - local_user_id: uuid::Uuid, - actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - - data.federation_repo - .add_blocked_actor(local_user_id, actor_url) - .await?; - let _ = data - .federation_repo - .remove_follower(local_user_id, actor_url) - .await; - let _ = data - .federation_repo - .remove_following(local_user_id, actor_url) - .await; - - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - if let Ok(Some(remote_actor)) = data.federation_repo.get_remote_actor(actor_url).await { - let block_id = - crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - let block = crate::activities::BlockActivity { - id: block_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: Url::parse(actor_url)?, - }; - let inbox = Url::parse(&remote_actor.inbox_url)?; - let (json, sends, inboxes) = self - .prepare_broadcast(&data, &local_actor, vec![inbox], block) - .await?; - self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json) - .await?; - } - - Ok(()) - } - - pub async fn unblock_actor( - &self, - local_user_id: uuid::Uuid, - actor_url: &str, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - data.federation_repo - .remove_blocked_actor(local_user_id, actor_url) - .await - } - - pub async fn get_blocked_actors( - &self, - local_user_id: uuid::Uuid, - ) -> anyhow::Result> { - let data = self.federation_config.to_request_data(); - let actor_urls = data - .federation_repo - .get_blocked_actors(local_user_id) - .await?; - let mut actors = Vec::new(); - for url in actor_urls { - let actor = match data.federation_repo.get_remote_actor(&url).await { - Ok(Some(a)) => a, - _ => RemoteActor { - url: url.clone(), - handle: url.clone(), - inbox_url: url.clone(), - shared_inbox_url: None, - display_name: None, - avatar_url: None, - outbox_url: None, - }, - }; - actors.push(actor); - } - Ok(actors) - } - - pub async fn add_blocked_domain( - &self, - domain: &str, - reason: Option<&str>, - ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - data.federation_repo - .add_blocked_domain(domain, reason) - .await - } - - pub async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - data.federation_repo.remove_blocked_domain(domain).await - } - - pub async fn get_blocked_domains(&self) -> anyhow::Result> { - let data = self.federation_config.to_request_data(); - data.federation_repo.get_blocked_domains().await - } - - async fn follow_local( - &self, - local_user_id: uuid::Uuid, - target_username: &str, - data: &activitypub_federation::config::Data, - ) -> anyhow::Result<()> { - let target = data - .user_repo - .find_by_username(target_username) - .await? - .ok_or_else(|| anyhow::anyhow!("user not found: {}", target_username))?; - - if target.id == local_user_id { - return Err(anyhow::anyhow!("cannot follow yourself")); - } - - let follower_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); - let target_actor_url = crate::urls::actor_url(&self.base_url, target.id); - let target_inbox_url = format!("{}/inbox", target_actor_url); - let follow_id = activity_url(&self.base_url) - .map_err(|e| anyhow::anyhow!("{e}"))? - .to_string(); - - data.federation_repo - .add_follower( - target.id, - &follower_actor_url, - FollowerStatus::Accepted, - &follow_id, - ) - .await?; - - let target_as_remote = RemoteActor { - url: target_actor_url.to_string(), - handle: format!("{}@{}", target.username, data.domain), - inbox_url: target_inbox_url, - shared_inbox_url: None, - display_name: Some(target.username), - avatar_url: None, - outbox_url: None, - }; - data.federation_repo - .add_following(local_user_id, target_as_remote, &follow_id) - .await?; - - data.federation_repo - .update_following_status( - local_user_id, - target_actor_url.as_ref(), - FollowingStatus::Accepted, - ) - .await?; - - tracing::info!(follower = %local_user_id, followee = %target.id, "local follow"); - Ok(()) - } - - async fn unfollow_local( - &self, - local_user_id: uuid::Uuid, - target_actor_url: &str, - data: &activitypub_federation::config::Data, - ) -> anyhow::Result<()> { - let target_url = Url::parse(target_actor_url)?; - let target_user_id = crate::urls::extract_user_id_from_url(&target_url) - .ok_or_else(|| anyhow::anyhow!("invalid local actor URL: {}", target_actor_url))?; - - let local_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); - - data.federation_repo - .remove_follower(target_user_id, &local_actor_url) - .await?; - data.federation_repo - .remove_following(local_user_id, target_actor_url) - .await?; - - tracing::info!(follower = %local_user_id, followee = %target_user_id, "local unfollow"); - Ok(()) - } - - pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> { - let client = reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(HTTP_FETCH_TIMEOUT_SECS)) - .build()?; - let data = self.federation_config.to_request_data(); - let actor = url::Url::parse(actor_url)?; - - let root: serde_json::Value = client - .get(outbox_url) - .header("Accept", "application/activity+json") - .send() - .await? - .json() - .await?; - - let first = match root.get("first").and_then(|v| v.as_str()) { - Some(url) => url.to_string(), - None => { - tracing::debug!(outbox = %outbox_url, "outbox has no first page, nothing to backfill"); - return Ok(()); - } - }; - - let mut current_url = first; - let mut visited = std::collections::HashSet::new(); - - loop { - if !visited.insert(current_url.clone()) { - tracing::warn!(url = %current_url, "backfill: loop detected, stopping"); - break; - } - - let page: serde_json::Value = match client - .get(¤t_url) - .header("Accept", "application/activity+json") - .send() - .await - { - Ok(resp) => match resp.json().await { - Ok(v) => v, - Err(e) => { - tracing::error!(error = %e, url = %current_url, "backfill: failed to parse page JSON"); - break; - } - }, - Err(e) => { - tracing::error!(error = %e, url = %current_url, "backfill: HTTP request failed"); - break; - } - }; - - if let Some(items) = page.get("orderedItems").and_then(|v| v.as_array()) { - for item in items { - let activity_type = item.get("type").and_then(|v| v.as_str()).unwrap_or(""); - if activity_type != "Create" && activity_type != "Add" { - continue; - } - let object = match item.get("object") { - Some(o) if o.is_object() => o.clone(), - _ => continue, - }; - let ap_id = match object - .get("id") - .and_then(|v| v.as_str()) - .and_then(|s| url::Url::parse(s).ok()) - { - Some(u) => u, - None => continue, - }; - if let Err(e) = data.object_handler.on_create(&ap_id, &actor, object).await { - tracing::warn!(ap_id = %ap_id, error = %e, "backfill: failed to process item, skipping"); - } - } - } - - match page.get("next").and_then(|v| v.as_str()) { - Some(next) => current_url = next.to_string(), - None => break, - } - } - - tracing::info!(outbox = %outbox_url, pages = visited.len(), "backfill complete"); - Ok(()) - } - - fn spawn_backfill(&self, owner_user_id: uuid::Uuid, follower_inbox_url: String) { - let config = self.federation_config.clone(); - let base_url = self.base_url.clone(); - let max_attempts = self.delivery_max_attempts; - let initial_delay = self.delivery_initial_delay_secs; - tokio::spawn(async move { - if let Err(e) = ActivityPubService::run_backfill( - config, - base_url, - owner_user_id, - follower_inbox_url, - max_attempts, - initial_delay, - ) - .await - { - tracing::warn!(error = %e, "backfill: task failed"); - } - }); - } - - async fn run_backfill( - config: ApFederationConfig, - base_url: String, - owner_user_id: uuid::Uuid, - follower_inbox_url: String, - max_attempts: u32, - initial_delay: u64, - ) -> anyhow::Result<()> { - const BATCH_SIZE: usize = 20; - - let data = config.to_request_data(); - let local_actor = get_local_actor(owner_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - let inbox = Url::parse(&follower_inbox_url)?; - - let mut objects = data - .object_handler - .get_local_objects_for_user(owner_user_id) - .await?; - objects.reverse(); // oldest first → chronological feed - - let total = objects.len(); - let mut success_count = 0usize; - let mut failure_count = 0usize; - - for chunk in objects.chunks(BATCH_SIZE) { - for (ap_id, object_json) in chunk { - let create_id = Url::parse(&format!( - "{}/activities/create/{}", - base_url, - uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, ap_id.as_str().as_bytes()) - ))?; - - let create = CreateActivity { - id: create_id, - kind: Default::default(), - actor: ObjectId::from(local_actor.ap_id.clone()), - object: object_json.clone(), - to: vec![], - cc: vec![], - bto: vec![], - bcc: vec![], - }; - - let sends = SendActivityTask::prepare( - &WithContext::new_default(create), - &local_actor, - vec![inbox.clone()], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data, max_attempts, initial_delay).await; - if failures.is_empty() { - success_count += 1; - } else { - failure_count += 1; - } - } - tokio::time::sleep(std::time::Duration::from_millis(BATCH_FETCH_SLEEP_MS)).await; - } - - tracing::info!( - user_id = %owner_user_id, - follower = %follower_inbox_url, - sent = success_count, - failed = failure_count, - total = total, - "backfill complete" - ); - Ok(()) - } -} - -#[cfg(test)] -#[path = "tests/service.rs"] -mod tests; diff --git a/src/service/backfill.rs b/src/service/backfill.rs new file mode 100644 index 0000000..2554145 --- /dev/null +++ b/src/service/backfill.rs @@ -0,0 +1,109 @@ +use activitypub_federation::{activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext}; +use url::Url; + +use crate::{ + activities::CreateActivity, + actors::get_local_actor, + federation::ApFederationConfig, +}; + +use super::{ActivityPubService, delivery::send_with_retry}; + +impl ActivityPubService { + pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> { + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(super::HTTP_FETCH_TIMEOUT_SECS)) + .build()?; + let data = self.federation_config.to_request_data(); + let actor = url::Url::parse(actor_url)?; + let root: serde_json::Value = client.get(outbox_url).header("Accept", "application/activity+json").send().await?.json().await?; + let first = match root.get("first").and_then(|v| v.as_str()) { + Some(url) => url.to_string(), + None => { tracing::debug!(outbox = %outbox_url, "outbox has no first page"); return Ok(()); } + }; + let mut current_url = first; + let mut visited = std::collections::HashSet::new(); + loop { + if !visited.insert(current_url.clone()) { + tracing::warn!(url = %current_url, "backfill: loop detected, stopping"); + break; + } + let page: serde_json::Value = match client.get(¤t_url).header("Accept", "application/activity+json").send().await { + Ok(resp) => match resp.json().await { Ok(v) => v, Err(e) => { tracing::error!(error = %e, "backfill: failed to parse page JSON"); break; } }, + Err(e) => { tracing::error!(error = %e, "backfill: HTTP request failed"); break; } + }; + if let Some(items) = page.get("orderedItems").and_then(|v| v.as_array()) { + for item in items { + let activity_type = item.get("type").and_then(|v| v.as_str()).unwrap_or(""); + if activity_type != "Create" && activity_type != "Add" { continue; } + let Some(object) = item.get("object").filter(|o| o.is_object()).cloned() else { continue }; + let Some(ap_id) = object.get("id").and_then(|v| v.as_str()).and_then(|s| url::Url::parse(s).ok()) else { continue }; + if let Err(e) = data.object_handler.on_create(&ap_id, &actor, object).await { + tracing::warn!(ap_id = %ap_id, error = %e, "backfill: failed to process item"); + } + } + } + match page.get("next").and_then(|v| v.as_str()) { + Some(next) => current_url = next.to_string(), + None => break, + } + } + tracing::info!(outbox = %outbox_url, pages = visited.len(), "backfill complete"); + Ok(()) + } + + /// Spawns the backfill task in the background. + /// `pub(crate)` so `service::follow` can call it from `accept_follower`. + pub(crate) fn spawn_backfill(&self, owner_user_id: uuid::Uuid, follower_inbox_url: String) { + let config = self.federation_config.clone(); + let base_url = self.base_url.clone(); + let max_attempts = self.delivery_max_attempts; + let initial_delay = self.delivery_initial_delay_secs; + tokio::spawn(async move { + if let Err(e) = ActivityPubService::run_backfill(config, base_url, owner_user_id, follower_inbox_url, max_attempts, initial_delay).await { + tracing::warn!(error = %e, "backfill: task failed"); + } + }); + } + + async fn run_backfill( + config: ApFederationConfig, + base_url: String, + owner_user_id: uuid::Uuid, + follower_inbox_url: String, + max_attempts: u32, + initial_delay: u64, + ) -> anyhow::Result<()> { + const BATCH_SIZE: usize = 20; + let data = config.to_request_data(); + let local_actor = get_local_actor(owner_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let inbox = Url::parse(&follower_inbox_url)?; + let mut objects = data.object_handler.get_local_objects_for_user(owner_user_id).await?; + objects.reverse(); + let total = objects.len(); + let (mut success_count, mut failure_count) = (0usize, 0usize); + for chunk in objects.chunks(BATCH_SIZE) { + for (ap_id, object_json) in chunk { + let create_id = Url::parse(&format!( + "{}/activities/create/{}", + base_url, + uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, ap_id.as_str().as_bytes()) + ))?; + let create = CreateActivity { + id: create_id, kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: object_json.clone(), to: vec![], cc: vec![], bto: vec![], bcc: vec![], + }; + let sends = SendActivityTask::prepare(&WithContext::new_default(create), &local_actor, vec![inbox.clone()], &data).await?; + if send_with_retry(sends, &data, max_attempts, initial_delay).await.is_empty() { + success_count += 1; + } else { + failure_count += 1; + } + } + tokio::time::sleep(std::time::Duration::from_millis(super::BATCH_FETCH_SLEEP_MS)).await; + } + tracing::info!(user_id = %owner_user_id, follower = %follower_inbox_url, sent = success_count, failed = failure_count, total = total, "backfill complete"); + Ok(()) + } +} diff --git a/src/service/broadcast.rs b/src/service/broadcast.rs new file mode 100644 index 0000000..a8b36c4 --- /dev/null +++ b/src/service/broadcast.rs @@ -0,0 +1,259 @@ +use activitypub_federation::{fetch::object_id::ObjectId, protocol::context::WithContext, traits::Object}; +use url::Url; + +use crate::{ + activities::{ + AddActivity, AnnounceActivity, CreateActivity, DeleteActivity, + MoveActivity, UndoActivity, UpdateActivity, + }, + actors::get_local_actor, + urls::activity_url, +}; + +use super::ActivityPubService; + +impl ActivityPubService { + pub async fn broadcast_announce_to_followers( + &self, + local_user_id: uuid::Uuid, + object_ap_id: url::Url, + ) -> anyhow::Result<()> { + let announce_id = url::Url::parse(&format!( + "{}/activities/announce/{}", + self.base_url, + uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, format!("{}/{}", local_user_id, object_ap_id).as_bytes()), + )).map_err(|e| anyhow::anyhow!("{e}"))?; + let data = self.federation_config.to_request_data(); + let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; + let announce = AnnounceActivity { + id: announce_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: object_ap_id, + published: Some(chrono::Utc::now()), + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![local_actor.followers_url.to_string()], + }; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, announce).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + } + + pub async fn broadcast_undo_announce_to_followers( + &self, + local_user_id: uuid::Uuid, + object_ap_id: url::Url, + ) -> anyhow::Result<()> { + let announce_id = url::Url::parse(&format!( + "{}/activities/announce/{}", + self.base_url, + uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, format!("{}/{}", local_user_id, object_ap_id).as_bytes()), + )).map_err(|e| anyhow::anyhow!("{e}"))?; + let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + let data = self.federation_config.to_request_data(); + let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; + let undo = UndoActivity { + id: undo_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: serde_json::json!({"type":"Announce","id":announce_id.to_string(),"actor":local_actor.ap_id.to_string(),"object":object_ap_id.to_string()}), + }; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, undo).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + } + + pub async fn broadcast_like_to_inbox( + &self, + liker_user_id: uuid::Uuid, + object_ap_id: url::Url, + author_inbox_url: url::Url, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(liker_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let like_id = url::Url::parse(&format!( + "{}/activities/like/{}", + self.base_url, + uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, format!("{}/{}", liker_user_id, object_ap_id).as_bytes()), + ))?; + let like = crate::activities::LikeActivity { + id: like_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: object_ap_id, + }; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![author_inbox_url], like).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + } + + pub async fn broadcast_undo_like_to_inbox( + &self, + liker_user_id: uuid::Uuid, + object_ap_id: url::Url, + author_inbox_url: url::Url, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(liker_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let like_id = url::Url::parse(&format!( + "{}/activities/like/{}", + self.base_url, + uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, format!("{}/{}", liker_user_id, object_ap_id).as_bytes()), + ))?; + let undo_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + let undo = UndoActivity { + id: undo_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: serde_json::json!({"type":"Like","id":like_id.to_string(),"actor":local_actor.ap_id.to_string(),"object":object_ap_id.to_string()}), + }; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![author_inbox_url], undo).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + } + + pub async fn broadcast_delete_to_followers( + &self, + local_user_id: uuid::Uuid, + ap_id: Url, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; + let delete = DeleteActivity { + id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: serde_json::json!(ap_id.to_string()), + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![local_actor.followers_url.to_string()], + }; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, delete).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + } + + pub async fn broadcast_add_to_followers( + &self, + local_user_id: uuid::Uuid, + ap_id: Url, + object: serde_json::Value, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; + let add = AddActivity { + id: ap_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object, + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![local_actor.followers_url.to_string()], + }; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, add).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + } + + pub async fn broadcast_undo_add_to_followers( + &self, + local_user_id: uuid::Uuid, + watchlist_entry_ap_id: Url, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; + let undo = UndoActivity { + id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: serde_json::json!({"type":"Add","id":watchlist_entry_ap_id.as_str(),"object":{"id":watchlist_entry_ap_id.as_str()}}), + }; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, undo).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + } + + pub async fn broadcast_create_note( + &self, + local_user_id: uuid::Uuid, + note: serde_json::Value, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; + let note_id_str = note["id"].as_str().unwrap_or(""); + let create_id = Url::parse(&format!( + "{}/activities/create/{}", + self.base_url, + uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, note_id_str.as_bytes()) + )).map_err(|e| anyhow::anyhow!("{e}"))?; + let create = CreateActivity { + id: create_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: note, + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![local_actor.followers_url.to_string()], + bto: vec![], + bcc: vec![], + }; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, create).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + } + + pub async fn broadcast_update_note( + &self, + local_user_id: uuid::Uuid, + note: serde_json::Value, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); }; + let update = crate::activities::UpdateActivity { + id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: note, + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![local_actor.followers_url.to_string()], + }; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, update).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + } + + pub async fn broadcast_actor_update(&self, user_id: uuid::Uuid) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let person = local_actor.clone().into_json(&data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let person_json = serde_json::to_value(WithContext::new_default(person))?; + let update_id = Url::parse(&format!("{}/activities/update/{}", self.base_url, uuid::Uuid::new_v4()))?; + let update = UpdateActivity { + id: update_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: person_json, + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![local_actor.followers_url.to_string()], + }; + let Some((_, inboxes)) = self.accepted_follower_inboxes(&data, user_id).await? else { + tracing::info!(%user_id, "no accepted followers, skipping actor update broadcast"); + return Ok(()); + }; + tracing::info!(%user_id, inbox_count = inboxes.len(), "broadcasting actor update"); + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, update).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + } + + pub async fn broadcast_move( + &self, + user_id: uuid::Uuid, + new_actor_url: url::Url, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let Some((_, inboxes)) = self.accepted_follower_inboxes(&data, user_id).await? else { + tracing::info!(%user_id, "broadcast_move: no accepted followers"); + return Ok(()); + }; + let move_activity = MoveActivity { + id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: local_actor.ap_id.clone(), + target: new_actor_url.clone(), + }; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, inboxes, move_activity).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; + tracing::info!(%user_id, target = %new_actor_url, "broadcast_move: dispatched"); + Ok(()) + } +} diff --git a/src/service/delivery.rs b/src/service/delivery.rs new file mode 100644 index 0000000..d40d185 --- /dev/null +++ b/src/service/delivery.rs @@ -0,0 +1,181 @@ +use std::fmt::Debug; + +use activitypub_federation::{activity_sending::SendActivityTask, traits::Activity}; +use serde::Serialize; +use url::Url; + +use crate::actors::{DbActor, get_local_actor}; +use crate::data::{FederationData, FederationEvent}; +use crate::error::Error; + +use super::ActivityPubService; + +pub(crate) async fn send_with_retry( + sends: Vec, + data: &activitypub_federation::config::Data, + max_attempts: u32, + initial_delay_secs: u64, +) -> Vec { + let mut failures = vec![]; + for send in sends { + let mut delay = std::time::Duration::from_secs(initial_delay_secs); + for attempt in 1..=max_attempts { + match send.clone().sign_and_send(data).await { + Ok(()) => break, + Err(e) if attempt < max_attempts => { + tracing::warn!(attempt, error = %e, "delivery failed, retrying"); + tokio::time::sleep(delay).await; + delay *= 2; + } + Err(e) => { + tracing::error!(attempt, error = %e, "delivery failed permanently"); + failures.push(anyhow::anyhow!(e)); + } + } + } + } + failures +} + +/// Wraps a pre-serialized AP activity JSON for re-signing via `SendActivityTask::prepare`. +/// Used by `deliver_to_inbox` when a consumer re-presents a persisted queue item. +#[derive(Debug)] +struct RawActivity { + id: Url, + actor_url: Url, + value: serde_json::Value, +} + +impl Serialize for RawActivity { + fn serialize(&self, s: S) -> Result { + self.value.serialize(s) + } +} + +#[async_trait::async_trait] +impl Activity for RawActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { &self.id } + fn actor(&self) -> &Url { &self.actor_url } + + async fn verify(&self, _data: &activitypub_federation::config::Data) -> Result<(), Self::Error> { + Ok(()) + } + async fn receive(self, _data: &activitypub_federation::config::Data) -> Result<(), Self::Error> { + Ok(()) + } +} + +impl ActivityPubService { + /// Route deliveries to the EventPublisher (one DeliveryRequested event per inbox) + /// or fall back to a fire-and-forget tokio::spawn. + /// `pub(crate)` so sibling modules (broadcast.rs, follow.rs) can call it on `self`. + pub(crate) async fn dispatch_deliveries( + &self, + data: &activitypub_federation::config::Data, + local_actor: &DbActor, + inboxes: Vec, + sends: Vec, + activity_json: serde_json::Value, + ) -> anyhow::Result<()> { + if let Some(publisher) = data.event_publisher.as_ref() { + for inbox in inboxes { + let event = FederationEvent::DeliveryRequested { + inbox, + activity: activity_json.clone(), + signing_actor_id: local_actor.user_id, + }; + if let Err(e) = publisher.publish(event).await { + tracing::warn!(error = %e, "failed to enqueue DeliveryRequested event"); + } + } + } else { + let data = data.clone(); + let max_attempts = self.delivery_max_attempts; + let initial_delay = self.delivery_initial_delay_secs; + tokio::spawn(async move { + let failures = + send_with_retry(sends, &data, max_attempts, initial_delay).await; + if !failures.is_empty() { + tracing::warn!(count = failures.len(), "some deliveries failed permanently"); + } + }); + } + Ok(()) + } + + /// Deliver a single outbound activity to `inbox`. + /// Call from a job-queue consumer processing a `FederationEvent::DeliveryRequested` event. + pub async fn deliver_to_inbox( + &self, + inbox: url::Url, + activity: serde_json::Value, + signing_actor_id: uuid::Uuid, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let actor = get_local_actor(signing_actor_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + let id = activity + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| Url::parse(s).ok()) + .unwrap_or_else(|| actor.ap_id.clone()); + let actor_url = activity + .get("actor") + .and_then(|v| v.as_str()) + .and_then(|s| Url::parse(s).ok()) + .unwrap_or_else(|| actor.ap_id.clone()); + let raw = RawActivity { id, actor_url, value: activity.clone() }; + let sends = + SendActivityTask::prepare(&raw, &actor, vec![inbox.clone()], &data).await?; + let failures = send_with_retry( + sends, + &data, + self.delivery_max_attempts, + self.delivery_initial_delay_secs, + ) + .await; + if failures.is_empty() { + return Ok(()); + } + let error_msg = failures + .iter() + .map(|e| e.to_string()) + .collect::>() + .join("; "); + if let Some(publisher) = data.event_publisher.as_ref() { + let _ = publisher + .publish(FederationEvent::DeliveryFailed { + inbox, + activity, + signing_actor_id, + error: error_msg.clone(), + }) + .await; + } + Err(anyhow::anyhow!("delivery failed: {}", error_msg)) + } + + /// Serialize `activity` to JSON and prepare `SendActivityTask` objects. + /// Returns `(activity_json, sends, inboxes)` so both dispatch paths have what they need. + /// `pub(super)` — visible to all child modules of `service` (broadcast.rs, follow.rs, etc.). + pub(super) async fn prepare_broadcast( + &self, + data: &activitypub_federation::config::Data, + local_actor: &DbActor, + inboxes: Vec, + activity: A, + ) -> anyhow::Result<(serde_json::Value, Vec, Vec)> + where + A: Activity + Serialize + Debug + Send + Sync, + { + let with_ctx = activitypub_federation::protocol::context::WithContext::new_default(activity); + let activity_json = serde_json::to_value(&with_ctx)?; + let sends = + SendActivityTask::prepare(&with_ctx, local_actor, inboxes.clone(), data).await?; + Ok((activity_json, sends, inboxes)) + } +} diff --git a/src/service/follow.rs b/src/service/follow.rs new file mode 100644 index 0000000..328e578 --- /dev/null +++ b/src/service/follow.rs @@ -0,0 +1,227 @@ +use activitypub_federation::{fetch::object_id::ObjectId, traits::Actor}; +use url::Url; + +use crate::{ + activities::{AcceptActivity, FollowActivity, RejectActivity, UndoActivity}, + actors::get_local_actor, + data::FederationData, + repository::{FollowerStatus, FollowingStatus, RemoteActor}, + urls::activity_url, +}; + +use super::ActivityPubService; + +impl ActivityPubService { + pub async fn follow(&self, local_user_id: uuid::Uuid, handle: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let normalized = handle.trim_start_matches('@'); + let parts: Vec<&str> = normalized.splitn(2, '@').collect(); + if parts.len() == 2 && parts[1] == data.domain { + return self.follow_local(local_user_id, parts[0], &data).await; + } + let remote_actor = self.webfinger_https(handle, &data).await?; + let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + let follow_id_str = follow_id.to_string(); + let remote = RemoteActor { + url: remote_actor.ap_id.to_string(), + handle: format!("{}@{}", remote_actor.username, remote_actor.ap_id.host_str().unwrap_or("")), + inbox_url: remote_actor.inbox_url.to_string(), + shared_inbox_url: remote_actor.shared_inbox_url.as_ref().map(|u| u.to_string()), + display_name: Some(remote_actor.username.clone()), + avatar_url: remote_actor.avatar_url.as_ref().map(|u| u.to_string()), + outbox_url: Some(remote_actor.outbox_url.to_string()), + }; + // Save BEFORE delivering — prevents lost state on process restart. + data.federation_repo.add_following(local_user_id, remote, &follow_id_str).await?; + let follow = FollowActivity { + id: Url::parse(&follow_id_str)?, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: ObjectId::from(remote_actor.ap_id.clone()), + }; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![remote_actor.inbox()], follow).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await + } + + pub async fn unfollow(&self, local_user_id: uuid::Uuid, actor_url_str: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + if actor_url_str.starts_with(&self.base_url) { + return self.unfollow_local(local_user_id, actor_url_str, &data).await; + } + let remote = data.federation_repo.get_remote_actor(actor_url_str).await? + .ok_or_else(|| anyhow::anyhow!("remote actor not found: {}", actor_url_str))?; + let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let remote_ap_id = Url::parse(actor_url_str)?; + let inbox = Url::parse(&remote.inbox_url)?; + let follow_id = data.federation_repo.get_follow_activity_id(local_user_id, actor_url_str).await? + .and_then(|id| Url::parse(&id).ok()) + .unwrap_or_else(|| activity_url(&self.base_url).unwrap_or_else(|_| remote_ap_id.clone())); + let follow = FollowActivity { id: follow_id, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: ObjectId::from(remote_ap_id) }; + let undo = UndoActivity { + id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: serde_json::to_value(&follow).map_err(|e| anyhow::anyhow!("{e}"))?, + }; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], undo).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; + data.federation_repo.remove_following(local_user_id, actor_url_str).await?; + data.object_handler.on_actor_removed(&Url::parse(actor_url_str)?).await?; + Ok(()) + } + + pub async fn accept_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let remote_actor = data.federation_repo.get_remote_actor(remote_actor_url).await? + .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; + let follow_id_str = data.federation_repo.get_follower_follow_activity_id(local_user_id, remote_actor_url).await? + .ok_or_else(|| anyhow::anyhow!("follow activity id not found for {}", remote_actor_url))?; + let follow = FollowActivity { id: Url::parse(&follow_id_str)?, kind: Default::default(), actor: ObjectId::from(Url::parse(remote_actor_url)?), object: ObjectId::from(local_actor.ap_id.clone()) }; + let accept = AcceptActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: follow }; + data.federation_repo.update_follower_status(local_user_id, remote_actor_url, FollowerStatus::Accepted).await?; + let inbox = Url::parse(&remote_actor.inbox_url)?; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], accept).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; + let target_inbox = remote_actor.shared_inbox_url.clone().unwrap_or_else(|| remote_actor.inbox_url.clone()); + self.spawn_backfill(local_user_id, target_inbox); + Ok(()) + } + + pub async fn reject_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let remote_actor = data.federation_repo.get_remote_actor(remote_actor_url).await? + .ok_or_else(|| anyhow::anyhow!("remote actor not found"))?; + let follow = FollowActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(Url::parse(remote_actor_url)?), object: ObjectId::from(local_actor.ap_id.clone()) }; + let reject = RejectActivity { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: follow }; + let inbox = Url::parse(&remote_actor.inbox_url)?; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], reject).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; + data.federation_repo.remove_follower(local_user_id, remote_actor_url).await?; + Ok(()) + } + + pub async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + data.federation_repo.get_pending_followers(local_user_id).await + } + + pub async fn get_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + Ok(data.federation_repo.get_followers(local_user_id).await? + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .map(|f| f.actor) + .collect()) + } + + pub async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> anyhow::Result { + let data = self.federation_config.to_request_data(); + Ok(data.federation_repo.get_followers(local_user_id).await? + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .count()) + } + + pub async fn get_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + data.federation_repo.get_following(local_user_id).await + } + + pub async fn count_following(&self, local_user_id: uuid::Uuid) -> anyhow::Result { + let data = self.federation_config.to_request_data(); + data.federation_repo.count_following(local_user_id).await + } + + pub async fn remove_follower(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + data.federation_repo.remove_follower(local_user_id, actor_url).await + } + + pub async fn block_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + data.federation_repo.add_blocked_actor(local_user_id, actor_url).await?; + let _ = data.federation_repo.remove_follower(local_user_id, actor_url).await; + let _ = data.federation_repo.remove_following(local_user_id, actor_url).await; + let local_actor = get_local_actor(local_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + if let Ok(Some(remote_actor)) = data.federation_repo.get_remote_actor(actor_url).await { + let block = crate::activities::BlockActivity { + id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: Url::parse(actor_url)?, + }; + let inbox = Url::parse(&remote_actor.inbox_url)?; + let (json, sends, inboxes) = self.prepare_broadcast(&data, &local_actor, vec![inbox], block).await?; + self.dispatch_deliveries(&data, &local_actor, inboxes, sends, json).await?; + } + Ok(()) + } + + pub async fn unblock_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + data.federation_repo.remove_blocked_actor(local_user_id, actor_url).await + } + + pub async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + let actor_urls = data.federation_repo.get_blocked_actors(local_user_id).await?; + let mut actors = Vec::new(); + for url in actor_urls { + let actor = match data.federation_repo.get_remote_actor(&url).await { + Ok(Some(a)) => a, + _ => RemoteActor { url: url.clone(), handle: url.clone(), inbox_url: url.clone(), shared_inbox_url: None, display_name: None, avatar_url: None, outbox_url: None }, + }; + actors.push(actor); + } + Ok(actors) + } + + pub(super) async fn follow_local( + &self, + local_user_id: uuid::Uuid, + target_username: &str, + data: &activitypub_federation::config::Data, + ) -> anyhow::Result<()> { + let target = data.user_repo.find_by_username(target_username).await? + .ok_or_else(|| anyhow::anyhow!("user not found: {}", target_username))?; + if target.id == local_user_id { + return Err(anyhow::anyhow!("cannot follow yourself")); + } + let follower_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); + let target_actor_url = crate::urls::actor_url(&self.base_url, target.id); + let follow_id = activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?.to_string(); + data.federation_repo.add_follower(target.id, &follower_actor_url, FollowerStatus::Accepted, &follow_id).await?; + let target_as_remote = RemoteActor { + url: target_actor_url.to_string(), + handle: format!("{}@{}", target.username, data.domain), + inbox_url: format!("{}/inbox", target_actor_url), + shared_inbox_url: None, + display_name: Some(target.username), + avatar_url: None, + outbox_url: None, + }; + data.federation_repo.add_following(local_user_id, target_as_remote, &follow_id).await?; + data.federation_repo.update_following_status(local_user_id, target_actor_url.as_ref(), FollowingStatus::Accepted).await?; + tracing::info!(follower = %local_user_id, followee = %target.id, "local follow"); + Ok(()) + } + + pub(super) async fn unfollow_local( + &self, + local_user_id: uuid::Uuid, + target_actor_url: &str, + data: &activitypub_federation::config::Data, + ) -> anyhow::Result<()> { + let target_url = Url::parse(target_actor_url)?; + let target_user_id = crate::urls::extract_user_id_from_url(&target_url) + .ok_or_else(|| anyhow::anyhow!("invalid local actor URL: {}", target_actor_url))?; + let local_actor_url = crate::urls::actor_url(&self.base_url, local_user_id).to_string(); + data.federation_repo.remove_follower(target_user_id, &local_actor_url).await?; + data.federation_repo.remove_following(local_user_id, target_actor_url).await?; + tracing::info!(follower = %local_user_id, followee = %target_user_id, "local unfollow"); + Ok(()) + } +} diff --git a/src/service/mod.rs b/src/service/mod.rs new file mode 100644 index 0000000..5d3add6 --- /dev/null +++ b/src/service/mod.rs @@ -0,0 +1,261 @@ +use std::sync::Arc; + +use activitypub_federation::{ + protocol::context::WithContext, + traits::Object, +}; +use axum::{Router, extract::DefaultBodyLimit, routing::get, routing::post}; +use url::Url; + +use crate::{ + actor_handler::actor_handler, + actors::{DbActor, get_local_actor}, + content::ApObjectHandler, + data::FederationData, + federation::ApFederationConfig, + followers_handler::{followers_handler, following_handler}, + inbox::inbox_handler, + nodeinfo::{nodeinfo_handler, nodeinfo_well_known_handler}, + outbox::outbox_handler, + repository::{BlockedDomain, FederationRepository}, + user::ApUserRepository, + webfinger::webfinger_handler, +}; + +mod backfill; +mod broadcast; +pub(super) mod delivery; +mod follow; + +/// Default max delivery retries per inbox (used as the builder default). +pub const DELIVERY_MAX_ATTEMPTS: u32 = 3; +/// Default initial retry backoff in seconds; doubles each attempt. +pub const DELIVERY_INITIAL_DELAY_SECS: u64 = 1; +/// HTTP timeout when fetching remote AP resources. +pub const HTTP_FETCH_TIMEOUT_SECS: u64 = 30; +/// Sleep between backfill send batches. +pub const BATCH_FETCH_SLEEP_MS: u64 = 100; + +#[derive(Clone)] +pub struct ActivityPubService { + pub(super) federation_config: ApFederationConfig, + pub(super) base_url: String, + pub(super) delivery_max_attempts: u32, + pub(super) delivery_initial_delay_secs: u64, +} + +pub struct ActivityPubServiceBuilder { + repo: Arc, + user_repo: Arc, + object_handler: Arc, + base_url: String, + allow_registration: bool, + software_name: String, + debug: bool, + event_publisher: Option>, + delivery_max_attempts: u32, + delivery_initial_delay_secs: u64, +} + +impl ActivityPubServiceBuilder { + pub fn allow_registration(mut self, v: bool) -> Self { self.allow_registration = v; self } + pub fn software_name(mut self, v: impl Into) -> Self { self.software_name = v.into(); self } + pub fn debug(mut self, v: bool) -> Self { self.debug = v; self } + pub fn event_publisher(mut self, v: Arc) -> Self { self.event_publisher = Some(v); self } + /// Override max delivery retries (default: `DELIVERY_MAX_ATTEMPTS`). + pub fn delivery_max_attempts(mut self, v: u32) -> Self { self.delivery_max_attempts = v; self } + /// Override initial retry backoff in seconds (default: `DELIVERY_INITIAL_DELAY_SECS`). + pub fn delivery_initial_delay_secs(mut self, v: u64) -> Self { self.delivery_initial_delay_secs = v; self } + + pub async fn build(self) -> anyhow::Result { + let data = FederationData::new( + self.repo, self.user_repo, self.object_handler, self.base_url.clone(), + self.allow_registration, self.software_name, self.event_publisher, + ); + let federation_config = ApFederationConfig::new(data, self.debug).await?; + Ok(ActivityPubService { + federation_config, + base_url: self.base_url, + delivery_max_attempts: self.delivery_max_attempts, + delivery_initial_delay_secs: self.delivery_initial_delay_secs, + }) + } +} + +impl ActivityPubService { + pub fn builder( + repo: Arc, + user_repo: Arc, + object_handler: Arc, + base_url: impl Into, + ) -> ActivityPubServiceBuilder { + ActivityPubServiceBuilder { + repo, user_repo, object_handler, base_url: base_url.into(), + allow_registration: false, software_name: String::new(), + debug: false, event_publisher: None, + delivery_max_attempts: DELIVERY_MAX_ATTEMPTS, + delivery_initial_delay_secs: DELIVERY_INITIAL_DELAY_SECS, + } + } + + pub fn federation_config(&self) -> &ApFederationConfig { &self.federation_config } + pub fn request_data(&self) -> activitypub_federation::config::Data { self.federation_config.to_request_data() } + pub fn base_url(&self) -> &str { &self.base_url } + + /// Returns the ActivityPub router. Inbox routes enforce a 1 MB body limit. + pub fn router(&self) -> Router + where + S: Clone + Send + Sync + 'static, + { + Router::new() + .route("/.well-known/nodeinfo", get(nodeinfo_well_known_handler)) + .route("/nodeinfo/2.0", get(nodeinfo_handler)) + .route("/.well-known/webfinger", get(webfinger_handler)) + .route("/inbox", post(inbox_handler).layer(DefaultBodyLimit::max(1024 * 1024))) + .route("/users/{id}", get(actor_handler)) + .route("/users/{id}/inbox", post(inbox_handler).layer(DefaultBodyLimit::max(1024 * 1024))) + .route("/users/{id}/outbox", get(outbox_handler)) + .route("/users/{id}/followers", get(followers_handler)) + .route("/users/{id}/following", get(following_handler)) + .layer(self.federation_config.middleware()) + } + + pub async fn actor_json(&self, user_id_str: &str) -> anyhow::Result { + let uuid = uuid::Uuid::parse_str(user_id_str)?; + let data = self.federation_config.to_request_data(); + let actor = get_local_actor(uuid, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let person = actor.into_json(&data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + Ok(serde_json::to_string(&WithContext::new_default(person))?) + } + + pub async fn followers_collection_json(&self, user_id: uuid::Uuid, page: Option) -> anyhow::Result { + const AP_CONTEXT: &str = "https://www.w3.org/ns/activitystreams"; + const PAGE_SIZE: usize = 20; + let data = self.federation_config.to_request_data(); + let collection_id = format!("{}/users/{}/followers", self.base_url, user_id); + let total = data.federation_repo.count_followers(user_id).await?; + let obj = if let Some(p) = page { + let p = p.max(1); + let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE; + let followers = data.federation_repo.get_followers_page(user_id, offset as u32, PAGE_SIZE).await?; + let has_next = offset + followers.len() < total; + let items: Vec = followers.into_iter().map(|f| f.actor.url).collect(); + let mut obj = serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollectionPage","id":format!("{}?page={}",collection_id,p),"partOf":collection_id,"totalItems":total,"orderedItems":items}); + if has_next { obj["next"] = serde_json::json!(format!("{}?page={}",collection_id,p+1)); } + obj + } else { + serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollection","id":collection_id,"totalItems":total,"first":format!("{}?page=1",collection_id)}) + }; + Ok(serde_json::to_string(&obj)?) + } + + pub async fn following_collection_json(&self, user_id: uuid::Uuid, page: Option) -> anyhow::Result { + const AP_CONTEXT: &str = "https://www.w3.org/ns/activitystreams"; + const PAGE_SIZE: usize = 20; + let data = self.federation_config.to_request_data(); + let collection_id = format!("{}/users/{}/following", self.base_url, user_id); + let total = data.federation_repo.count_following(user_id).await?; + let obj = if let Some(p) = page { + let p = p.max(1); + let offset = (p.saturating_sub(1) as usize) * PAGE_SIZE; + let following = data.federation_repo.get_following_page(user_id, offset as u32, PAGE_SIZE).await?; + let has_next = offset + following.len() < total; + let items: Vec = following.into_iter().map(|a| a.url).collect(); + let mut obj = serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollectionPage","id":format!("{}?page={}",collection_id,p),"partOf":collection_id,"totalItems":total,"orderedItems":items}); + if has_next { obj["next"] = serde_json::json!(format!("{}?page={}",collection_id,p+1)); } + obj + } else { + serde_json::json!({"@context":AP_CONTEXT,"type":"OrderedCollection","id":collection_id,"totalItems":total,"first":format!("{}?page=1",collection_id)}) + }; + Ok(serde_json::to_string(&obj)?) + } + + pub async fn mark_follower_accepted(&self, user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + data.federation_repo.update_follower_status(user_id, actor_url, crate::repository::FollowerStatus::Accepted).await.map_err(|e| anyhow::anyhow!("{e}")) + } + + pub async fn mark_follower_rejected(&self, user_id: uuid::Uuid, actor_url: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + data.federation_repo.remove_follower(user_id, actor_url).await.map_err(|e| anyhow::anyhow!("{e}")) + } + + pub async fn lookup_actor_by_handle(&self, handle: &str) -> anyhow::Result { + tracing::info!(handle, "looking up remote actor"); + let data = self.federation_config.to_request_data(); + let actor = self.webfinger_https(handle, &data).await + .inspect_err(|e| tracing::warn!(handle, error = %e, "actor lookup failed"))?; + let domain = actor.ap_id.host_str().unwrap_or("").to_string(); + tracing::info!(handle = format!("{}@{}", actor.username, domain), ap_url = %actor.ap_id, "remote actor resolved"); + Ok(crate::user::LookedUpActor { + handle: format!("{}@{}", actor.username, domain), + display_name: actor.display_name, bio: actor.bio, + avatar_url: actor.avatar_url, banner_url: actor.banner_url, + ap_url: actor.ap_id, outbox_url: Some(actor.outbox_url), + followers_url: Some(actor.followers_url), following_url: Some(actor.following_url), + also_known_as: actor.also_known_as, profile_url: actor.profile_url, + attachment: actor.attachment, + }) + } + + pub async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + data.federation_repo.add_blocked_domain(domain, reason).await + } + + pub async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + data.federation_repo.remove_blocked_domain(domain).await + } + + pub async fn get_blocked_domains(&self) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + data.federation_repo.get_blocked_domains().await + } + + // ── Private helpers (accessible to child modules via Rust's privacy rules) ─ + + async fn accepted_follower_inboxes( + &self, + data: &activitypub_federation::config::Data, + local_user_id: uuid::Uuid, + ) -> anyhow::Result)>> { + let local_actor = get_local_actor(local_user_id, data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + let inbox_strs = data.federation_repo.get_accepted_follower_inboxes(local_user_id).await?; + if inbox_strs.is_empty() { return Ok(None); } + let inboxes: Vec = inbox_strs.into_iter().filter_map(|s| { + Url::parse(&s).map_err(|e| tracing::warn!(inbox = %s, error = %e, "skipping unparseable inbox URL")).ok() + }).collect(); + if inboxes.is_empty() { return Ok(None); } + Ok(Some((local_actor, inboxes))) + } + + async fn webfinger_https( + &self, + handle: &str, + data: &activitypub_federation::config::Data, + ) -> anyhow::Result { + let normalized = handle.trim_start_matches('@'); + let at = normalized.rfind('@').ok_or_else(|| anyhow::anyhow!("handle must be user@domain"))?; + let (user, domain_str) = (&normalized[..at], &normalized[at + 1..]); + let wf_url = format!("https://{}/.well-known/webfinger?resource=acct:{}@{}", domain_str, user, domain_str); + tracing::debug!(handle, wf_url, "resolving webfinger"); + let wf: serde_json::Value = reqwest::Client::new().get(&wf_url) + .header("Accept", "application/jrd+json, application/json") + .send().await?.json().await?; + let self_href = wf["links"].as_array() + .and_then(|links| links.iter().find(|l| l["rel"].as_str() == Some("self") && l["type"].as_str() == Some("application/activity+json"))) + .and_then(|l| l["href"].as_str()) + .ok_or_else(|| anyhow::anyhow!("no self link in WebFinger response"))?.to_owned(); + tracing::debug!(handle, self_href, "webfinger resolved, fetching actor"); + let actor: DbActor = activitypub_federation::fetch::object_id::ObjectId::from(url::Url::parse(&self_href)?) + .dereference(data).await.map_err(|e| anyhow::anyhow!("{e}"))?; + Ok(actor) + } +} + +#[cfg(test)] +mod tests { + // Inbox deduplication and broadcast filtering are now tested via repository + // integration tests in the consuming crate. See get_accepted_follower_inboxes. +}