From 1874954ad7c3883fc1783ee01004271f4469c0b3 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 17 May 2026 23:02:49 +0200 Subject: [PATCH] fix: resolve thoughts compile errors after k-ap migration --- Cargo.lock | 51 +- crates/adapters/activitypub/Cargo.toml | 6 +- crates/adapters/activitypub/src/lib.rs | 2 + crates/adapters/activitypub/src/service.rs | 830 ++++++++++++++++++ .../adapters/postgres-federation/Cargo.toml | 2 +- crates/bootstrap/Cargo.toml | 2 +- crates/bootstrap/src/factory.rs | 10 +- crates/worker/Cargo.toml | 2 +- crates/worker/src/factory.rs | 8 +- 9 files changed, 879 insertions(+), 34 deletions(-) create mode 100644 crates/adapters/activitypub/src/service.rs diff --git a/Cargo.lock b/Cargo.lock index f9a52ac..4b07b14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5,22 +5,6 @@ version = 4 [[package]] name = "activitypub" version = "0.1.0" -dependencies = [ - "activitypub-base", - "anyhow", - "async-trait", - "chrono", - "domain", - "serde", - "serde_json", - "tracing", - "url", - "uuid", -] - -[[package]] -name = "activitypub-base" -version = "0.1.0" dependencies = [ "activitypub_federation", "anyhow", @@ -28,8 +12,8 @@ dependencies = [ "axum", "chrono", "domain", - "enum_delegate", "futures", + "k-ap", "reqwest", "serde", "serde_json", @@ -289,7 +273,7 @@ dependencies = [ name = "application" version = "0.1.0" dependencies = [ - "activitypub-base", + "activitypub", "async-trait", "chrono", "domain", @@ -596,7 +580,6 @@ name = "bootstrap" version = "0.1.0" dependencies = [ "activitypub", - "activitypub-base", "async-nats", "async-trait", "auth", @@ -605,6 +588,7 @@ dependencies = [ "dotenvy", "event-transport", "http 1.4.0", + "k-ap", "nats", "postgres", "postgres-federation", @@ -2005,6 +1989,27 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "k-ap" +version = "0.1.0" +source = "git+https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git?tag=v0.1.2#767b1e69d4f384093ea33d72d5aa46ff140f5ac8" +dependencies = [ + "activitypub_federation", + "anyhow", + "async-trait", + "axum", + "chrono", + "enum_delegate", + "futures", + "reqwest", + "serde", + "serde_json", + "tokio", + "tracing", + "url", + "uuid", +] + [[package]] name = "language-tags" version = "0.3.2" @@ -2452,7 +2457,7 @@ checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" name = "postgres" version = "0.1.0" dependencies = [ - "activitypub-base", + "activitypub", "async-trait", "chrono", "domain", @@ -2470,10 +2475,10 @@ dependencies = [ name = "postgres-federation" version = "0.1.0" dependencies = [ - "activitypub-base", "anyhow", "async-trait", "chrono", + "k-ap", "sqlx", "tokio", "tracing", @@ -2522,7 +2527,7 @@ dependencies = [ name = "presentation" version = "0.1.0" dependencies = [ - "activitypub-base", + "activitypub", "api-types", "application", "async-trait", @@ -4715,7 +4720,6 @@ name = "worker" version = "0.1.0" dependencies = [ "activitypub", - "activitypub-base", "application", "async-nats", "domain", @@ -4723,6 +4727,7 @@ dependencies = [ "event-payload", "event-transport", "futures", + "k-ap", "nats", "postgres", "postgres-federation", diff --git a/crates/adapters/activitypub/Cargo.toml b/crates/adapters/activitypub/Cargo.toml index c2ca55f..3ad9899 100644 --- a/crates/adapters/activitypub/Cargo.toml +++ b/crates/adapters/activitypub/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.0" } +k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.2" } domain = { workspace = true } url = { workspace = true } serde = { workspace = true } @@ -15,3 +15,7 @@ uuid = { workspace = true } async-trait = { workspace = true } tracing = { workspace = true } activitypub_federation = "0.7.0-beta.11" +reqwest = { workspace = true } +futures = { workspace = true } +tokio = { workspace = true } +axum = { workspace = true } diff --git a/crates/adapters/activitypub/src/lib.rs b/crates/adapters/activitypub/src/lib.rs index 13ef1d1..837dc26 100644 --- a/crates/adapters/activitypub/src/lib.rs +++ b/crates/adapters/activitypub/src/lib.rs @@ -1,9 +1,11 @@ pub mod handler; pub mod note; pub mod port; +pub mod service; pub mod urls; pub use handler::ThoughtsObjectHandler; pub use note::ThoughtNote; pub use port::{AcceptNoteInput, ActivityPubRepository, ActorApUrls, OutboundFederationPort, OutboxEntry}; +pub use service::ApFederationAdapter; pub use urls::ThoughtsUrls; diff --git a/crates/adapters/activitypub/src/service.rs b/crates/adapters/activitypub/src/service.rs new file mode 100644 index 0000000..672f4ba --- /dev/null +++ b/crates/adapters/activitypub/src/service.rs @@ -0,0 +1,830 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use k_ap::ActivityPubService; + +use domain::{ + errors::DomainError, + models::remote_actor::RemoteActor as DomainRemoteActor, + ports::{ + FederationFetchPort, FederationFollowPort, FederationFollowRequestPort, + FederationLookupPort, FederationSchedulerPort, RemoteActorConnectionRepository, + }, + value_objects::UserId, +}; + +const HTTP_FETCH_TIMEOUT_SECS: u64 = 30; +const BATCH_FETCH_SLEEP_MS: u64 = 100; + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +fn content_to_html(text: &str) -> String { + let escaped = text + .replace('&', "&") + .replace('<', "<") + .replace('>', ">") + .replace('"', """); + let paragraphs: Vec<&str> = escaped.split('\n').filter(|s| !s.is_empty()).collect(); + if paragraphs.is_empty() { + format!("

{}

", escaped) + } else { + paragraphs + .iter() + .map(|p| format!("

{}

", p)) + .collect::>() + .join("") + } +} + +fn build_note_json( + thought: &domain::models::thought::Thought, + local_actor_ap_id: &str, + local_actor_followers_url: &str, + base_url: &str, + in_reply_to_url: Option<&str>, +) -> serde_json::Value { + let ap_id = format!("{}/thoughts/{}", base_url, thought.id); + + let (to, cc) = match thought.visibility { + domain::models::thought::Visibility::Public => ( + vec![k_ap::AS_PUBLIC.to_string()], + vec![local_actor_followers_url.to_string()], + ), + domain::models::thought::Visibility::Unlisted => ( + vec![local_actor_followers_url.to_string()], + vec![k_ap::AS_PUBLIC.to_string()], + ), + domain::models::thought::Visibility::Followers => { + (vec![local_actor_followers_url.to_string()], vec![]) + } + domain::models::thought::Visibility::Direct => (vec![], vec![]), + }; + + let mut note = serde_json::json!({ + "type": "Note", + "id": ap_id, + "url": ap_id, + "attributedTo": local_actor_ap_id, + "content": content_to_html(thought.content.as_str()), + "published": thought.created_at.to_rfc3339(), + "to": to, + "cc": cc, + "sensitive": thought.sensitive, + }); + if let Some(ref cw) = thought.content_warning { + note["summary"] = serde_json::json!(cw); + } + if let Some(reply_url) = in_reply_to_url { + note["inReplyTo"] = serde_json::json!(reply_url); + } + if let Some(updated_at) = thought.updated_at { + note["updated"] = serde_json::json!(updated_at.to_rfc3339()); + } + let hashtags = domain::hashtag::extract(thought.content.as_str()); + if !hashtags.is_empty() { + let ap_tags: Vec = hashtags + .iter() + .map(|h| { + serde_json::json!({ + "type": "Hashtag", + "name": h.ap_name, + "href": format!("{}/{}", base_url, h.url_slug), + }) + }) + .collect(); + note["tag"] = serde_json::json!(ap_tags); + } + note +} + +fn k_ap_actor_to_domain(a: k_ap::RemoteActor) -> DomainRemoteActor { + DomainRemoteActor { + url: a.url, + handle: a.handle, + display_name: a.display_name, + avatar_url: a.avatar_url, + outbox_url: a.outbox_url, + last_fetched_at: chrono::Utc::now(), + bio: None, + banner_url: None, + also_known_as: None, + followers_url: None, + following_url: None, + attachment: vec![], + } +} + +async fn resolve_actor_profiles_from_urls( + urls: Vec, +) -> Vec { + use futures::future; + + async fn fetch_one( + url: String, + ) -> Option { + let resp: serde_json::Value = tokio::time::timeout( + std::time::Duration::from_secs(5), + reqwest::Client::new() + .get(&url) + .header("Accept", "application/activity+json") + .send(), + ) + .await + .ok()? + .ok()? + .json() + .await + .ok()?; + + let ap_url = resp["id"].as_str()?.to_string(); + let preferred_username = resp["preferredUsername"].as_str().unwrap_or("").to_string(); + let domain_str = url::Url::parse(&ap_url) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + let handle = format!("{}@{}", preferred_username, domain_str); + let display_name = resp["name"].as_str().map(|s| s.to_string()); + let avatar_url = resp["icon"]["url"].as_str().map(|s| s.to_string()); + + Some(domain::models::actor_connection_summary::ActorConnectionSummary { + url: ap_url, + handle, + display_name, + avatar_url, + }) + } + + let futs: Vec<_> = urls.into_iter().map(fetch_one).collect(); + let results = future::join_all(futs).await; + + results + .into_iter() + .filter_map(|r| { + if r.is_none() { + tracing::warn!("failed to resolve actor profile (timeout or parse error)"); + } + r + }) + .collect() +} + +async fn webfinger_resolve_actor_url(handle: &str) -> anyhow::Result { + let normalized = handle.trim_start_matches('@'); + let at = normalized + .rfind('@') + .ok_or_else(|| anyhow::anyhow!("handle must be user@domain"))?; + let (user, domain_str) = (&normalized[..at], &normalized[at + 1..]); + let wf_url = format!( + "https://{}/.well-known/webfinger?resource=acct:{}@{}", + domain_str, user, domain_str + ); + let wf: serde_json::Value = reqwest::Client::new() + .get(&wf_url) + .header("Accept", "application/jrd+json, application/json") + .send() + .await? + .json() + .await?; + let self_href = wf["links"] + .as_array() + .and_then(|links| { + links.iter().find(|l| { + l["rel"].as_str() == Some("self") + && l["type"].as_str() == Some("application/activity+json") + }) + }) + .and_then(|l| l["href"].as_str()) + .ok_or_else(|| anyhow::anyhow!("no self link in WebFinger response"))? + .to_owned(); + Ok(self_href) +} + +// ── ApFederationAdapter ─────────────────────────────────────────────────────── + +/// Wraps `k_ap::ActivityPubService` together with the `RemoteActorConnectionRepository` +/// (which k-ap doesn't own), and implements all domain federation port traits. +#[derive(Clone)] +pub struct ApFederationAdapter { + pub(crate) inner: Arc, + pub(crate) connections_repo: Arc, +} + +impl ApFederationAdapter { + pub fn new( + inner: Arc, + connections_repo: Arc, + ) -> Self { + Self { + inner, + connections_repo, + } + } + + pub fn router(&self) -> axum::Router + where + S: Clone + Send + Sync + 'static, + { + self.inner.router() + } + + fn base_url(&self) -> &str { + self.inner.base_url() + } + + fn actor_ap_id(&self, user_uuid: uuid::Uuid) -> String { + format!("{}/users/{}", self.base_url(), user_uuid) + } + + fn actor_followers_url(&self, user_uuid: uuid::Uuid) -> String { + format!("{}/followers", self.actor_ap_id(user_uuid)) + } +} + +// ── OutboundFederationPort ──────────────────────────────────────────────────── + +#[async_trait] +impl crate::port::OutboundFederationPort for ApFederationAdapter { + async fn broadcast_create( + &self, + author_user_id: &UserId, + thought: &domain::models::thought::Thought, + _author_username: &str, + in_reply_to_url: Option<&str>, + ) -> Result<(), DomainError> { + let user_uuid = author_user_id.as_uuid(); + let ap_id = self.actor_ap_id(user_uuid); + let followers_url = self.actor_followers_url(user_uuid); + let note = build_note_json(thought, &ap_id, &followers_url, self.base_url(), in_reply_to_url); + self.inner + .broadcast_create_note(user_uuid, note) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + } + + async fn broadcast_delete( + &self, + author_user_id: &UserId, + thought_ap_id: &str, + ) -> Result<(), DomainError> { + let ap_id = url::Url::parse(thought_ap_id) + .map_err(|e| DomainError::Internal(e.to_string()))?; + self.inner + .broadcast_delete_to_followers(author_user_id.as_uuid(), ap_id) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + } + + async fn broadcast_update( + &self, + author_user_id: &UserId, + thought: &domain::models::thought::Thought, + _author_username: &str, + in_reply_to_url: Option<&str>, + ) -> Result<(), DomainError> { + let user_uuid = author_user_id.as_uuid(); + let ap_id = self.actor_ap_id(user_uuid); + let followers_url = self.actor_followers_url(user_uuid); + let note = build_note_json(thought, &ap_id, &followers_url, self.base_url(), in_reply_to_url); + self.inner + .broadcast_update_note(user_uuid, note) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + } + + async fn broadcast_announce( + &self, + booster_user_id: &UserId, + object_ap_id: &str, + ) -> Result<(), DomainError> { + let ap_id = url::Url::parse(object_ap_id) + .map_err(|e| DomainError::Internal(e.to_string()))?; + self.inner + .broadcast_announce_to_followers(booster_user_id.as_uuid(), ap_id) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + } + + async fn broadcast_undo_announce( + &self, + booster_user_id: &UserId, + object_ap_id: &str, + ) -> Result<(), DomainError> { + let ap_id = url::Url::parse(object_ap_id) + .map_err(|e| DomainError::Internal(e.to_string()))?; + self.inner + .broadcast_undo_announce_to_followers(booster_user_id.as_uuid(), ap_id) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + } + + async fn broadcast_like( + &self, + liker_user_id: &UserId, + object_ap_id: &str, + author_inbox_url: &str, + ) -> Result<(), DomainError> { + let object = url::Url::parse(object_ap_id) + .map_err(|e| DomainError::Internal(e.to_string()))?; + let inbox = url::Url::parse(author_inbox_url) + .map_err(|e| DomainError::Internal(e.to_string()))?; + self.inner + .broadcast_like_to_inbox(liker_user_id.as_uuid(), object, inbox) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + } + + async fn broadcast_undo_like( + &self, + liker_user_id: &UserId, + object_ap_id: &str, + author_inbox_url: &str, + ) -> Result<(), DomainError> { + let object = url::Url::parse(object_ap_id) + .map_err(|e| DomainError::Internal(e.to_string()))?; + let inbox = url::Url::parse(author_inbox_url) + .map_err(|e| DomainError::Internal(e.to_string()))?; + self.inner + .broadcast_undo_like_to_inbox(liker_user_id.as_uuid(), object, inbox) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + } + + async fn broadcast_actor_update(&self, user_id: &UserId) -> Result<(), DomainError> { + self.inner + .broadcast_actor_update(user_id.as_uuid()) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + } +} + +// ── FederationSchedulerPort ─────────────────────────────────────────────────── + +#[async_trait] +impl FederationSchedulerPort for ApFederationAdapter { + async fn schedule_actor_posts_fetch( + &self, + actor_ap_url: &str, + outbox_url: &str, + ) -> Result<(), DomainError> { + let service = self.inner.clone(); + let actor = actor_ap_url.to_string(); + let outbox = outbox_url.to_string(); + tokio::spawn(async move { + if let Err(e) = service.backfill_outbox(&outbox, &actor).await { + tracing::warn!(actor = %actor, error = %e, "posts backfill failed"); + } + }); + Ok(()) + } + + async fn schedule_connections_fetch( + &self, + actor_ap_url: &str, + collection_url: &str, + connection_type: &str, + page: u32, + ) -> Result<(), DomainError> { + if page != 1 { + return Ok(()); + } + let actor = actor_ap_url.to_string(); + let collection = collection_url.to_string(); + let conn_type = connection_type.to_string(); + let connections_repo = self.connections_repo.clone(); + tokio::spawn(async move { + let client = match reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(HTTP_FETCH_TIMEOUT_SECS)) + .build() + { + Ok(c) => c, + Err(e) => { + tracing::warn!(error = %e, "connections fetch: failed to build client"); + return; + } + }; + + let mut all_urls: Vec = Vec::new(); + let mut current_url: Option = Some(collection.clone()); + const MAX_ACTORS: usize = 500; + + while let Some(url) = current_url.take() { + let val: serde_json::Value = match client + .get(&url) + .header("Accept", "application/activity+json, application/ld+json") + .send() + .await + { + Ok(r) => match r.json().await { + Ok(v) => v, + Err(e) => { + tracing::warn!(error = %e, url = %url, "connections: parse error"); + break; + } + }, + Err(e) => { + tracing::warn!(error = %e, url = %url, "connections: HTTP error"); + break; + } + }; + + if val["type"].as_str() == Some("OrderedCollection") { + current_url = val["first"].as_str().map(|s| s.to_string()); + continue; + } + + let empty = vec![]; + let items = val["orderedItems"].as_array().unwrap_or(&empty); + for item in items { + let actor_url = + item.as_str().or_else(|| item["id"].as_str()).unwrap_or(""); + if !actor_url.is_empty() { + all_urls.push(actor_url.to_string()); + } + } + + if all_urls.len() >= MAX_ACTORS { + break; + } + current_url = val["next"].as_str().map(|s| s.to_string()); + if current_url.is_some() { + tokio::time::sleep(std::time::Duration::from_millis(BATCH_FETCH_SLEEP_MS)) + .await; + } + } + + if all_urls.is_empty() { + tracing::debug!( + actor = %actor, + connection_type = %conn_type, + "connections: empty collection" + ); + return; + } + + const PAGE_SIZE: usize = 20; + for (idx, chunk) in all_urls.chunks(PAGE_SIZE).enumerate() { + let page_num = (idx + 1) as u32; + let resolved = resolve_actor_profiles_from_urls(chunk.to_vec()).await; + if let Err(e) = connections_repo + .upsert_connections(&actor, &conn_type, page_num, &resolved) + .await + { + tracing::warn!(error = %e, "connections: upsert failed"); + } + } + + tracing::debug!( + actor = %actor, + connection_type = %conn_type, + count = all_urls.len(), + "connections fetch complete" + ); + }); + Ok(()) + } +} + +// ── FederationLookupPort ────────────────────────────────────────────────────── + +#[async_trait] +impl FederationLookupPort for ApFederationAdapter { + async fn lookup_actor(&self, handle: &str) -> Result { + let normalized = handle.trim_start_matches('@'); + let at = normalized.rfind('@').ok_or_else(|| { + DomainError::InvalidInput("handle must be user@domain".into()) + })?; + let (user, domain_str) = (&normalized[..at], &normalized[at + 1..]); + + let wf_url = format!( + "https://{}/.well-known/webfinger?resource=acct:{}@{}", + domain_str, user, domain_str + ); + let wf: serde_json::Value = reqwest::Client::new() + .get(&wf_url) + .header("Accept", "application/jrd+json, application/json") + .send() + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))? + .json() + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))?; + + let self_href = wf["links"] + .as_array() + .and_then(|links| { + links.iter().find(|l| { + l["rel"].as_str() == Some("self") + && l["type"].as_str() == Some("application/activity+json") + }) + }) + .and_then(|l| l["href"].as_str()) + .ok_or(DomainError::NotFound)? + .to_owned(); + + let actor_json: serde_json::Value = reqwest::Client::new() + .get(&self_href) + .header("Accept", "application/activity+json") + .send() + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))? + .json() + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))?; + + let ap_url = actor_json["id"].as_str().unwrap_or(&self_href).to_string(); + let preferred_username = + actor_json["preferredUsername"].as_str().unwrap_or("").to_string(); + let domain_part = url::Url::parse(&ap_url) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + let full_handle = format!("{}@{}", preferred_username, domain_part); + + Ok(DomainRemoteActor { + url: ap_url.clone(), + handle: full_handle, + display_name: actor_json["name"].as_str().map(|s| s.to_string()), + avatar_url: actor_json["icon"]["url"].as_str().map(|s| s.to_string()), + outbox_url: actor_json["outbox"].as_str().map(|s| s.to_string()), + last_fetched_at: chrono::Utc::now(), + bio: actor_json["summary"].as_str().map(|s| s.to_string()), + banner_url: actor_json["image"]["url"].as_str().map(|s| s.to_string()), + also_known_as: None, + followers_url: actor_json["followers"].as_str().map(|s| s.to_string()), + following_url: actor_json["following"].as_str().map(|s| s.to_string()), + attachment: vec![], + }) + } + + async fn actor_json(&self, user_id: &UserId) -> Result { + self.inner + .actor_json(&user_id.as_uuid().to_string()) + .await + .map_err(|e| DomainError::ExternalService(e.to_string())) + } + + async fn followers_collection_json( + &self, + user_id: &UserId, + page: Option, + ) -> Result { + self.inner + .followers_collection_json(user_id.as_uuid(), page) + .await + .map_err(|e| DomainError::ExternalService(e.to_string())) + } + + async fn following_collection_json( + &self, + user_id: &UserId, + page: Option, + ) -> Result { + self.inner + .following_collection_json(user_id.as_uuid(), page) + .await + .map_err(|e| DomainError::ExternalService(e.to_string())) + } +} + +// ── FederationFetchPort ─────────────────────────────────────────────────────── + +#[async_trait] +impl FederationFetchPort for ApFederationAdapter { + async fn fetch_outbox_page( + &self, + outbox_url: &str, + page: u32, + ) -> Result, DomainError> { + use chrono::DateTime; + + let client = reqwest::Client::new(); + let base: serde_json::Value = client + .get(outbox_url) + .header("Accept", "application/activity+json, application/ld+json") + .send() + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))? + .json() + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))?; + + let url = base["first"] + .as_str() + .map(|s| s.to_string()) + .unwrap_or_else(|| format!("{}?page={}", outbox_url, page)); + + let resp: serde_json::Value = client + .get(&url) + .header("Accept", "application/activity+json, application/ld+json") + .send() + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))? + .json() + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))?; + + let empty = vec![]; + let items = resp["orderedItems"].as_array().unwrap_or(&empty); + + let notes = items + .iter() + .filter_map(|item| { + let note = if item["type"].as_str() == Some("Create") { + &item["object"] + } else if item["type"].as_str() == Some("Note") { + item + } else { + return None; + }; + + let to = note["to"].as_array()?; + let is_public = to + .iter() + .any(|t| t.as_str() == Some("https://www.w3.org/ns/activitystreams#Public")); + if !is_public { + return None; + } + + let published = + DateTime::parse_from_rfc3339(note["published"].as_str()?) + .ok()? + .with_timezone(&chrono::Utc); + + let text = note["content"].as_str().unwrap_or("").to_string(); + let has_attachments = note["attachment"] + .as_array() + .map(|a| !a.is_empty()) + .unwrap_or(false); + + let content = if has_attachments { + let notice = + "

📎 Media attachment — not supported

"; + if text.is_empty() { + notice.to_string() + } else { + format!("{text}{notice}") + } + } else { + text + }; + + Some(domain::models::remote_note::RemoteNote { + ap_id: note["id"].as_str()?.to_string(), + content, + published, + sensitive: note["sensitive"].as_bool().unwrap_or(false), + content_warning: note["summary"].as_str().map(|s| s.to_string()), + }) + }) + .collect(); + + Ok(notes) + } + + async fn fetch_actor_urls_from_collection( + &self, + collection_url: &str, + ) -> Result, DomainError> { + let client = reqwest::Client::new(); + let base: serde_json::Value = client + .get(collection_url) + .header("Accept", "application/activity+json, application/ld+json") + .send() + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))? + .json() + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))?; + + let page = if base["orderedItems"].is_null() { + if let Some(first_url) = base["first"].as_str() { + client + .get(first_url) + .header("Accept", "application/activity+json, application/ld+json") + .send() + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))? + .json() + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))? + } else { + base + } + } else { + base + }; + + let empty = vec![]; + let items = page["orderedItems"].as_array().unwrap_or(&empty); + Ok(items + .iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect()) + } + + async fn resolve_actor_profiles( + &self, + urls: Vec, + ) -> Vec { + resolve_actor_profiles_from_urls(urls).await + } +} + +// ── FederationFollowPort ────────────────────────────────────────────────────── + +#[async_trait] +impl FederationFollowPort for ApFederationAdapter { + async fn follow_remote(&self, local_user_id: &UserId, handle: &str) -> Result<(), DomainError> { + self.inner + .follow(local_user_id.as_uuid(), handle) + .await + .map_err(|e| DomainError::ExternalService(e.to_string())) + } + + async fn unfollow_remote( + &self, + local_user_id: &UserId, + handle: &str, + ) -> Result<(), DomainError> { + let actor_url = webfinger_resolve_actor_url(handle) + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))?; + self.inner + .unfollow(local_user_id.as_uuid(), &actor_url) + .await + .map_err(|e| DomainError::ExternalService(e.to_string())) + } + + async fn get_remote_following( + &self, + user_id: &UserId, + ) -> Result, DomainError> { + self.inner + .get_following(user_id.as_uuid()) + .await + .map(|v| v.into_iter().map(k_ap_actor_to_domain).collect()) + .map_err(|e| DomainError::ExternalService(e.to_string())) + } +} + +// ── FederationFollowRequestPort ─────────────────────────────────────────────── + +#[async_trait] +impl FederationFollowRequestPort for ApFederationAdapter { + async fn get_pending_followers( + &self, + user_id: &UserId, + ) -> Result, DomainError> { + self.inner + .get_pending_followers(user_id.as_uuid()) + .await + .map(|v| v.into_iter().map(k_ap_actor_to_domain).collect()) + .map_err(|e| DomainError::ExternalService(e.to_string())) + } + + async fn accept_follow_request( + &self, + user_id: &UserId, + actor_url: &str, + ) -> Result<(), DomainError> { + self.inner + .accept_follower(user_id.as_uuid(), actor_url) + .await + .map_err(|e| DomainError::ExternalService(e.to_string())) + } + + async fn reject_follow_request( + &self, + user_id: &UserId, + actor_url: &str, + ) -> Result<(), DomainError> { + self.inner + .reject_follower(user_id.as_uuid(), actor_url) + .await + .map_err(|e| DomainError::ExternalService(e.to_string())) + } + + async fn get_remote_followers( + &self, + user_id: &UserId, + ) -> Result, DomainError> { + self.inner + .get_accepted_followers(user_id.as_uuid()) + .await + .map(|v| v.into_iter().map(k_ap_actor_to_domain).collect()) + .map_err(|e| DomainError::ExternalService(e.to_string())) + } + + async fn remove_remote_follower( + &self, + user_id: &UserId, + actor_url: &str, + ) -> Result<(), DomainError> { + self.inner + .remove_follower(user_id.as_uuid(), actor_url) + .await + .map_err(|e| DomainError::ExternalService(e.to_string())) + } +} + +// FederationActionPort is a blanket supertrait; no explicit impl needed. diff --git a/crates/adapters/postgres-federation/Cargo.toml b/crates/adapters/postgres-federation/Cargo.toml index 0fc67e3..c8582b5 100644 --- a/crates/adapters/postgres-federation/Cargo.toml +++ b/crates/adapters/postgres-federation/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.0" } +k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.2" } sqlx = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } diff --git a/crates/bootstrap/Cargo.toml b/crates/bootstrap/Cargo.toml index b36542d..6e5e220 100644 --- a/crates/bootstrap/Cargo.toml +++ b/crates/bootstrap/Cargo.toml @@ -14,7 +14,7 @@ postgres = { workspace = true } postgres-search = { workspace = true } postgres-federation = { workspace = true } activitypub = { workspace = true } -k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.0" } +k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.2" } nats = { workspace = true } event-transport = { workspace = true } auth = { workspace = true } diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index 7556bfb..a15883e 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use sqlx::PgPool; use std::sync::Arc; -use activitypub::ThoughtsObjectHandler; +use activitypub::{ApFederationAdapter, ThoughtsObjectHandler}; use k_ap::ActivityPubService; use auth::ApiKeyServiceImpl; use domain::{ @@ -27,7 +27,7 @@ use crate::config::Config; /// Everything the binary needs to start serving. pub struct Infrastructure { pub state: AppState, - pub ap_service: Arc, + pub ap_service: Arc, } struct NoOpEventPublisher; @@ -72,7 +72,9 @@ pub async fn build(cfg: &Config) -> Infrastructure { }; // 3. ActivityPub federation - let ap_service = Arc::new( + let connections_repo = + Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())); + let raw_ap_service = Arc::new( ActivityPubService::builder( Arc::new(PostgresFederationRepository::new(pool.clone())), Arc::new(PostgresApUserRepository::new( @@ -86,7 +88,6 @@ pub async fn build(cfg: &Config) -> Infrastructure { Arc::new(postgres::tag::PgTagRepository::new(pool.clone())), )), cfg.base_url.clone(), - Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())), ) .allow_registration(cfg.allow_registration) .software_name("thoughts") @@ -95,6 +96,7 @@ pub async fn build(cfg: &Config) -> Infrastructure { .await .expect("Failed to build ActivityPubService"), ); + let ap_service = Arc::new(ApFederationAdapter::new(raw_ap_service, connections_repo)); // 4. Application state let state = AppState { diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 3524676..4f04741 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -13,7 +13,7 @@ application = { workspace = true } nats = { workspace = true } event-transport = { workspace = true } event-payload = { workspace = true } -k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.0" } +k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.2" } activitypub = { workspace = true } postgres = { workspace = true } postgres-federation = { workspace = true } diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs index cc8022e..0604a04 100644 --- a/crates/worker/src/factory.rs +++ b/crates/worker/src/factory.rs @@ -3,7 +3,7 @@ use postgres::remote_actor_connections::PgRemoteActorConnectionRepository; use sqlx::PgPool; use std::sync::Arc; -use activitypub::ThoughtsObjectHandler; +use activitypub::{ApFederationAdapter, ThoughtsObjectHandler}; use activitypub::{ActivityPubRepository, OutboundFederationPort}; use k_ap::ActivityPubService; use application::services::{FederationEventService, NotificationEventService}; @@ -39,7 +39,9 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker )); // ActivityPub service (for federation fan-out) - let ap_service = Arc::new( + let connections_repo_worker = + Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())); + let raw_ap_service = Arc::new( ActivityPubService::builder( Arc::new(PostgresFederationRepository::new(pool.clone())), Arc::new(PostgresApUserRepository::new( @@ -53,13 +55,13 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker Arc::new(postgres::tag::PgTagRepository::new(pool.clone())), )), base_url, - Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())), ) .software_name("thoughts") .build() .await .expect("ActivityPubService build failed"), ); + let ap_service = Arc::new(ApFederationAdapter::new(raw_ap_service, connections_repo_worker)); let ap_outbound = ap_service.clone() as Arc; let ap_repo_worker = Arc::new(PgActivityPubRepository::new(pool.clone())) as Arc;