From 58126f195c1bd66fa421e8db19523ba04f7e3d8c Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 15 May 2026 00:33:14 +0200 Subject: [PATCH] feat(activitypub-base): impl fetch_actor_urls_from_collection + resolve_actor_profiles (concurrent, 5s timeout) --- crates/adapters/activitypub-base/Cargo.toml | 1 + .../adapters/activitypub-base/src/service.rs | 73 ++++++++++++++++++- .../activitypub-base/src/tests/service.rs | 6 ++ 3 files changed, 76 insertions(+), 4 deletions(-) diff --git a/crates/adapters/activitypub-base/Cargo.toml b/crates/adapters/activitypub-base/Cargo.toml index e195664..3efc249 100644 --- a/crates/adapters/activitypub-base/Cargo.toml +++ b/crates/adapters/activitypub-base/Cargo.toml @@ -5,6 +5,7 @@ edition = "2024" [dependencies] tokio = { workspace = true } +futures = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } uuid = { workspace = true } diff --git a/crates/adapters/activitypub-base/src/service.rs b/crates/adapters/activitypub-base/src/service.rs index d31928f..4c02646 100644 --- a/crates/adapters/activitypub-base/src/service.rs +++ b/crates/adapters/activitypub-base/src/service.rs @@ -1602,16 +1602,81 @@ impl domain::ports::FederationActionPort for ActivityPubService { async fn fetch_actor_urls_from_collection( &self, - _collection_url: &str, + collection_url: &str, ) -> Result, domain::errors::DomainError> { - Ok(vec![]) + let resp: serde_json::Value = reqwest::Client::new() + .get(collection_url) + .header("Accept", "application/activity+json, application/ld+json") + .send() + .await + .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))? + .json() + .await + .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; + + let empty = vec![]; + let items = resp["orderedItems"].as_array().unwrap_or(&empty); + Ok(items + .iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect()) } async fn resolve_actor_profiles( &self, - _urls: Vec, + urls: Vec, ) -> Vec { - vec![] + use futures::future; + + async fn fetch_one( + url: String, + ) -> Option { + let resp: serde_json::Value = tokio::time::timeout( + std::time::Duration::from_secs(5), + reqwest::Client::new() + .get(&url) + .header("Accept", "application/activity+json") + .send(), + ) + .await + .ok()? + .ok()? + .json() + .await + .ok()?; + + let ap_url = resp["id"].as_str()?.to_string(); + let preferred_username = resp["preferredUsername"].as_str().unwrap_or("").to_string(); + let domain_str = url::Url::parse(&ap_url) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + let handle = format!("{}@{}", preferred_username, domain_str); + let display_name = resp["name"].as_str().map(|s| s.to_string()); + let avatar_url = resp["icon"]["url"].as_str().map(|s| s.to_string()); + + Some( + domain::models::actor_connection_summary::ActorConnectionSummary { + url: ap_url, + handle, + display_name, + avatar_url, + }, + ) + } + + let futs: Vec<_> = urls.into_iter().map(fetch_one).collect(); + let results = future::join_all(futs).await; + + results + .into_iter() + .filter_map(|r| { + if r.is_none() { + tracing::warn!("failed to resolve actor profile (timeout or parse error)"); + } + r + }) + .collect() } } diff --git a/crates/adapters/activitypub-base/src/tests/service.rs b/crates/adapters/activitypub-base/src/tests/service.rs index b0b4752..3f81776 100644 --- a/crates/adapters/activitypub-base/src/tests/service.rs +++ b/crates/adapters/activitypub-base/src/tests/service.rs @@ -4,6 +4,12 @@ where { } +fn _assert_impl_federation_action_port_connections() +where + crate::service::ActivityPubService: domain::ports::FederationActionPort, +{ +} + use super::*; use crate::repository::{Follower, FollowerStatus, RemoteActor};