feat(activitypub-base): impl fetch_actor_urls_from_collection + resolve_actor_profiles (concurrent, 5s timeout)
This commit is contained in:
@@ -5,6 +5,7 @@ edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
tokio = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
|
||||
@@ -1602,16 +1602,81 @@ impl domain::ports::FederationActionPort for ActivityPubService {
|
||||
|
||||
async fn fetch_actor_urls_from_collection(
|
||||
&self,
|
||||
_collection_url: &str,
|
||||
collection_url: &str,
|
||||
) -> Result<Vec<String>, domain::errors::DomainError> {
|
||||
Ok(vec![])
|
||||
let resp: serde_json::Value = reqwest::Client::new()
|
||||
.get(collection_url)
|
||||
.header("Accept", "application/activity+json, application/ld+json")
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?
|
||||
.json()
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?;
|
||||
|
||||
let empty = vec![];
|
||||
let items = resp["orderedItems"].as_array().unwrap_or(&empty);
|
||||
Ok(items
|
||||
.iter()
|
||||
.filter_map(|v| v.as_str().map(|s| s.to_string()))
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn resolve_actor_profiles(
|
||||
&self,
|
||||
_urls: Vec<String>,
|
||||
urls: Vec<String>,
|
||||
) -> Vec<domain::models::actor_connection_summary::ActorConnectionSummary> {
|
||||
vec![]
|
||||
use futures::future;
|
||||
|
||||
async fn fetch_one(
|
||||
url: String,
|
||||
) -> Option<domain::models::actor_connection_summary::ActorConnectionSummary> {
|
||||
let resp: serde_json::Value = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(5),
|
||||
reqwest::Client::new()
|
||||
.get(&url)
|
||||
.header("Accept", "application/activity+json")
|
||||
.send(),
|
||||
)
|
||||
.await
|
||||
.ok()?
|
||||
.ok()?
|
||||
.json()
|
||||
.await
|
||||
.ok()?;
|
||||
|
||||
let ap_url = resp["id"].as_str()?.to_string();
|
||||
let preferred_username = resp["preferredUsername"].as_str().unwrap_or("").to_string();
|
||||
let domain_str = url::Url::parse(&ap_url)
|
||||
.ok()
|
||||
.and_then(|u| u.host_str().map(|s| s.to_string()))
|
||||
.unwrap_or_default();
|
||||
let handle = format!("{}@{}", preferred_username, domain_str);
|
||||
let display_name = resp["name"].as_str().map(|s| s.to_string());
|
||||
let avatar_url = resp["icon"]["url"].as_str().map(|s| s.to_string());
|
||||
|
||||
Some(
|
||||
domain::models::actor_connection_summary::ActorConnectionSummary {
|
||||
url: ap_url,
|
||||
handle,
|
||||
display_name,
|
||||
avatar_url,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
let futs: Vec<_> = urls.into_iter().map(fetch_one).collect();
|
||||
let results = future::join_all(futs).await;
|
||||
|
||||
results
|
||||
.into_iter()
|
||||
.filter_map(|r| {
|
||||
if r.is_none() {
|
||||
tracing::warn!("failed to resolve actor profile (timeout or parse error)");
|
||||
}
|
||||
r
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,12 @@ where
|
||||
{
|
||||
}
|
||||
|
||||
fn _assert_impl_federation_action_port_connections()
|
||||
where
|
||||
crate::service::ActivityPubService: domain::ports::FederationActionPort,
|
||||
{
|
||||
}
|
||||
|
||||
use super::*;
|
||||
use crate::repository::{Follower, FollowerStatus, RemoteActor};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user