diff --git a/Cargo.lock b/Cargo.lock
index f9a52ac..4b07b14 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5,22 +5,6 @@ version = 4
[[package]]
name = "activitypub"
version = "0.1.0"
-dependencies = [
- "activitypub-base",
- "anyhow",
- "async-trait",
- "chrono",
- "domain",
- "serde",
- "serde_json",
- "tracing",
- "url",
- "uuid",
-]
-
-[[package]]
-name = "activitypub-base"
-version = "0.1.0"
dependencies = [
"activitypub_federation",
"anyhow",
@@ -28,8 +12,8 @@ dependencies = [
"axum",
"chrono",
"domain",
- "enum_delegate",
"futures",
+ "k-ap",
"reqwest",
"serde",
"serde_json",
@@ -289,7 +273,7 @@ dependencies = [
name = "application"
version = "0.1.0"
dependencies = [
- "activitypub-base",
+ "activitypub",
"async-trait",
"chrono",
"domain",
@@ -596,7 +580,6 @@ name = "bootstrap"
version = "0.1.0"
dependencies = [
"activitypub",
- "activitypub-base",
"async-nats",
"async-trait",
"auth",
@@ -605,6 +588,7 @@ dependencies = [
"dotenvy",
"event-transport",
"http 1.4.0",
+ "k-ap",
"nats",
"postgres",
"postgres-federation",
@@ -2005,6 +1989,27 @@ dependencies = [
"simple_asn1",
]
+[[package]]
+name = "k-ap"
+version = "0.1.0"
+source = "git+https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git?tag=v0.1.2#767b1e69d4f384093ea33d72d5aa46ff140f5ac8"
+dependencies = [
+ "activitypub_federation",
+ "anyhow",
+ "async-trait",
+ "axum",
+ "chrono",
+ "enum_delegate",
+ "futures",
+ "reqwest",
+ "serde",
+ "serde_json",
+ "tokio",
+ "tracing",
+ "url",
+ "uuid",
+]
+
[[package]]
name = "language-tags"
version = "0.3.2"
@@ -2452,7 +2457,7 @@ checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49"
name = "postgres"
version = "0.1.0"
dependencies = [
- "activitypub-base",
+ "activitypub",
"async-trait",
"chrono",
"domain",
@@ -2470,10 +2475,10 @@ dependencies = [
name = "postgres-federation"
version = "0.1.0"
dependencies = [
- "activitypub-base",
"anyhow",
"async-trait",
"chrono",
+ "k-ap",
"sqlx",
"tokio",
"tracing",
@@ -2522,7 +2527,7 @@ dependencies = [
name = "presentation"
version = "0.1.0"
dependencies = [
- "activitypub-base",
+ "activitypub",
"api-types",
"application",
"async-trait",
@@ -4715,7 +4720,6 @@ name = "worker"
version = "0.1.0"
dependencies = [
"activitypub",
- "activitypub-base",
"application",
"async-nats",
"domain",
@@ -4723,6 +4727,7 @@ dependencies = [
"event-payload",
"event-transport",
"futures",
+ "k-ap",
"nats",
"postgres",
"postgres-federation",
diff --git a/crates/adapters/activitypub/Cargo.toml b/crates/adapters/activitypub/Cargo.toml
index c2ca55f..3ad9899 100644
--- a/crates/adapters/activitypub/Cargo.toml
+++ b/crates/adapters/activitypub/Cargo.toml
@@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
-k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.0" }
+k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.2" }
domain = { workspace = true }
url = { workspace = true }
serde = { workspace = true }
@@ -15,3 +15,7 @@ uuid = { workspace = true }
async-trait = { workspace = true }
tracing = { workspace = true }
activitypub_federation = "0.7.0-beta.11"
+reqwest = { workspace = true }
+futures = { workspace = true }
+tokio = { workspace = true }
+axum = { workspace = true }
diff --git a/crates/adapters/activitypub/src/lib.rs b/crates/adapters/activitypub/src/lib.rs
index 13ef1d1..837dc26 100644
--- a/crates/adapters/activitypub/src/lib.rs
+++ b/crates/adapters/activitypub/src/lib.rs
@@ -1,9 +1,11 @@
pub mod handler;
pub mod note;
pub mod port;
+pub mod service;
pub mod urls;
pub use handler::ThoughtsObjectHandler;
pub use note::ThoughtNote;
pub use port::{AcceptNoteInput, ActivityPubRepository, ActorApUrls, OutboundFederationPort, OutboxEntry};
+pub use service::ApFederationAdapter;
pub use urls::ThoughtsUrls;
diff --git a/crates/adapters/activitypub/src/service.rs b/crates/adapters/activitypub/src/service.rs
new file mode 100644
index 0000000..672f4ba
--- /dev/null
+++ b/crates/adapters/activitypub/src/service.rs
@@ -0,0 +1,830 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use k_ap::ActivityPubService;
+
+use domain::{
+ errors::DomainError,
+ models::remote_actor::RemoteActor as DomainRemoteActor,
+ ports::{
+ FederationFetchPort, FederationFollowPort, FederationFollowRequestPort,
+ FederationLookupPort, FederationSchedulerPort, RemoteActorConnectionRepository,
+ },
+ value_objects::UserId,
+};
+
+const HTTP_FETCH_TIMEOUT_SECS: u64 = 30;
+const BATCH_FETCH_SLEEP_MS: u64 = 100;
+
+// ── Helpers ───────────────────────────────────────────────────────────────────
+
+fn content_to_html(text: &str) -> String {
+ let escaped = text
+ .replace('&', "&")
+ .replace('<', "<")
+ .replace('>', ">")
+ .replace('"', """);
+ let paragraphs: Vec<&str> = escaped.split('\n').filter(|s| !s.is_empty()).collect();
+ if paragraphs.is_empty() {
+ format!("
{}
", escaped)
+ } else {
+ paragraphs
+ .iter()
+ .map(|p| format!("{}
", p))
+ .collect::>()
+ .join("")
+ }
+}
+
+fn build_note_json(
+ thought: &domain::models::thought::Thought,
+ local_actor_ap_id: &str,
+ local_actor_followers_url: &str,
+ base_url: &str,
+ in_reply_to_url: Option<&str>,
+) -> serde_json::Value {
+ let ap_id = format!("{}/thoughts/{}", base_url, thought.id);
+
+ let (to, cc) = match thought.visibility {
+ domain::models::thought::Visibility::Public => (
+ vec![k_ap::AS_PUBLIC.to_string()],
+ vec![local_actor_followers_url.to_string()],
+ ),
+ domain::models::thought::Visibility::Unlisted => (
+ vec![local_actor_followers_url.to_string()],
+ vec![k_ap::AS_PUBLIC.to_string()],
+ ),
+ domain::models::thought::Visibility::Followers => {
+ (vec![local_actor_followers_url.to_string()], vec![])
+ }
+ domain::models::thought::Visibility::Direct => (vec![], vec![]),
+ };
+
+ let mut note = serde_json::json!({
+ "type": "Note",
+ "id": ap_id,
+ "url": ap_id,
+ "attributedTo": local_actor_ap_id,
+ "content": content_to_html(thought.content.as_str()),
+ "published": thought.created_at.to_rfc3339(),
+ "to": to,
+ "cc": cc,
+ "sensitive": thought.sensitive,
+ });
+ if let Some(ref cw) = thought.content_warning {
+ note["summary"] = serde_json::json!(cw);
+ }
+ if let Some(reply_url) = in_reply_to_url {
+ note["inReplyTo"] = serde_json::json!(reply_url);
+ }
+ if let Some(updated_at) = thought.updated_at {
+ note["updated"] = serde_json::json!(updated_at.to_rfc3339());
+ }
+ let hashtags = domain::hashtag::extract(thought.content.as_str());
+ if !hashtags.is_empty() {
+ let ap_tags: Vec = hashtags
+ .iter()
+ .map(|h| {
+ serde_json::json!({
+ "type": "Hashtag",
+ "name": h.ap_name,
+ "href": format!("{}/{}", base_url, h.url_slug),
+ })
+ })
+ .collect();
+ note["tag"] = serde_json::json!(ap_tags);
+ }
+ note
+}
+
+fn k_ap_actor_to_domain(a: k_ap::RemoteActor) -> DomainRemoteActor {
+ DomainRemoteActor {
+ url: a.url,
+ handle: a.handle,
+ display_name: a.display_name,
+ avatar_url: a.avatar_url,
+ outbox_url: a.outbox_url,
+ last_fetched_at: chrono::Utc::now(),
+ bio: None,
+ banner_url: None,
+ also_known_as: None,
+ followers_url: None,
+ following_url: None,
+ attachment: vec![],
+ }
+}
+
+async fn resolve_actor_profiles_from_urls(
+ urls: Vec,
+) -> Vec {
+ use futures::future;
+
+ async fn fetch_one(
+ url: String,
+ ) -> Option {
+ let resp: serde_json::Value = tokio::time::timeout(
+ std::time::Duration::from_secs(5),
+ reqwest::Client::new()
+ .get(&url)
+ .header("Accept", "application/activity+json")
+ .send(),
+ )
+ .await
+ .ok()?
+ .ok()?
+ .json()
+ .await
+ .ok()?;
+
+ let ap_url = resp["id"].as_str()?.to_string();
+ let preferred_username = resp["preferredUsername"].as_str().unwrap_or("").to_string();
+ let domain_str = url::Url::parse(&ap_url)
+ .ok()
+ .and_then(|u| u.host_str().map(|s| s.to_string()))
+ .unwrap_or_default();
+ let handle = format!("{}@{}", preferred_username, domain_str);
+ let display_name = resp["name"].as_str().map(|s| s.to_string());
+ let avatar_url = resp["icon"]["url"].as_str().map(|s| s.to_string());
+
+ Some(domain::models::actor_connection_summary::ActorConnectionSummary {
+ url: ap_url,
+ handle,
+ display_name,
+ avatar_url,
+ })
+ }
+
+ let futs: Vec<_> = urls.into_iter().map(fetch_one).collect();
+ let results = future::join_all(futs).await;
+
+ results
+ .into_iter()
+ .filter_map(|r| {
+ if r.is_none() {
+ tracing::warn!("failed to resolve actor profile (timeout or parse error)");
+ }
+ r
+ })
+ .collect()
+}
+
+async fn webfinger_resolve_actor_url(handle: &str) -> anyhow::Result {
+ let normalized = handle.trim_start_matches('@');
+ let at = normalized
+ .rfind('@')
+ .ok_or_else(|| anyhow::anyhow!("handle must be user@domain"))?;
+ let (user, domain_str) = (&normalized[..at], &normalized[at + 1..]);
+ let wf_url = format!(
+ "https://{}/.well-known/webfinger?resource=acct:{}@{}",
+ domain_str, user, domain_str
+ );
+ let wf: serde_json::Value = reqwest::Client::new()
+ .get(&wf_url)
+ .header("Accept", "application/jrd+json, application/json")
+ .send()
+ .await?
+ .json()
+ .await?;
+ let self_href = wf["links"]
+ .as_array()
+ .and_then(|links| {
+ links.iter().find(|l| {
+ l["rel"].as_str() == Some("self")
+ && l["type"].as_str() == Some("application/activity+json")
+ })
+ })
+ .and_then(|l| l["href"].as_str())
+ .ok_or_else(|| anyhow::anyhow!("no self link in WebFinger response"))?
+ .to_owned();
+ Ok(self_href)
+}
+
+// ── ApFederationAdapter ───────────────────────────────────────────────────────
+
+/// Wraps `k_ap::ActivityPubService` together with the `RemoteActorConnectionRepository`
+/// (which k-ap doesn't own), and implements all domain federation port traits.
+#[derive(Clone)]
+pub struct ApFederationAdapter {
+ pub(crate) inner: Arc,
+ pub(crate) connections_repo: Arc,
+}
+
+impl ApFederationAdapter {
+ pub fn new(
+ inner: Arc,
+ connections_repo: Arc,
+ ) -> Self {
+ Self {
+ inner,
+ connections_repo,
+ }
+ }
+
+ pub fn router(&self) -> axum::Router
+ where
+ S: Clone + Send + Sync + 'static,
+ {
+ self.inner.router()
+ }
+
+ fn base_url(&self) -> &str {
+ self.inner.base_url()
+ }
+
+ fn actor_ap_id(&self, user_uuid: uuid::Uuid) -> String {
+ format!("{}/users/{}", self.base_url(), user_uuid)
+ }
+
+ fn actor_followers_url(&self, user_uuid: uuid::Uuid) -> String {
+ format!("{}/followers", self.actor_ap_id(user_uuid))
+ }
+}
+
+// ── OutboundFederationPort ────────────────────────────────────────────────────
+
+#[async_trait]
+impl crate::port::OutboundFederationPort for ApFederationAdapter {
+ async fn broadcast_create(
+ &self,
+ author_user_id: &UserId,
+ thought: &domain::models::thought::Thought,
+ _author_username: &str,
+ in_reply_to_url: Option<&str>,
+ ) -> Result<(), DomainError> {
+ let user_uuid = author_user_id.as_uuid();
+ let ap_id = self.actor_ap_id(user_uuid);
+ let followers_url = self.actor_followers_url(user_uuid);
+ let note = build_note_json(thought, &ap_id, &followers_url, self.base_url(), in_reply_to_url);
+ self.inner
+ .broadcast_create_note(user_uuid, note)
+ .await
+ .map_err(|e| DomainError::Internal(e.to_string()))
+ }
+
+ async fn broadcast_delete(
+ &self,
+ author_user_id: &UserId,
+ thought_ap_id: &str,
+ ) -> Result<(), DomainError> {
+ let ap_id = url::Url::parse(thought_ap_id)
+ .map_err(|e| DomainError::Internal(e.to_string()))?;
+ self.inner
+ .broadcast_delete_to_followers(author_user_id.as_uuid(), ap_id)
+ .await
+ .map_err(|e| DomainError::Internal(e.to_string()))
+ }
+
+ async fn broadcast_update(
+ &self,
+ author_user_id: &UserId,
+ thought: &domain::models::thought::Thought,
+ _author_username: &str,
+ in_reply_to_url: Option<&str>,
+ ) -> Result<(), DomainError> {
+ let user_uuid = author_user_id.as_uuid();
+ let ap_id = self.actor_ap_id(user_uuid);
+ let followers_url = self.actor_followers_url(user_uuid);
+ let note = build_note_json(thought, &ap_id, &followers_url, self.base_url(), in_reply_to_url);
+ self.inner
+ .broadcast_update_note(user_uuid, note)
+ .await
+ .map_err(|e| DomainError::Internal(e.to_string()))
+ }
+
+ async fn broadcast_announce(
+ &self,
+ booster_user_id: &UserId,
+ object_ap_id: &str,
+ ) -> Result<(), DomainError> {
+ let ap_id = url::Url::parse(object_ap_id)
+ .map_err(|e| DomainError::Internal(e.to_string()))?;
+ self.inner
+ .broadcast_announce_to_followers(booster_user_id.as_uuid(), ap_id)
+ .await
+ .map_err(|e| DomainError::Internal(e.to_string()))
+ }
+
+ async fn broadcast_undo_announce(
+ &self,
+ booster_user_id: &UserId,
+ object_ap_id: &str,
+ ) -> Result<(), DomainError> {
+ let ap_id = url::Url::parse(object_ap_id)
+ .map_err(|e| DomainError::Internal(e.to_string()))?;
+ self.inner
+ .broadcast_undo_announce_to_followers(booster_user_id.as_uuid(), ap_id)
+ .await
+ .map_err(|e| DomainError::Internal(e.to_string()))
+ }
+
+ async fn broadcast_like(
+ &self,
+ liker_user_id: &UserId,
+ object_ap_id: &str,
+ author_inbox_url: &str,
+ ) -> Result<(), DomainError> {
+ let object = url::Url::parse(object_ap_id)
+ .map_err(|e| DomainError::Internal(e.to_string()))?;
+ let inbox = url::Url::parse(author_inbox_url)
+ .map_err(|e| DomainError::Internal(e.to_string()))?;
+ self.inner
+ .broadcast_like_to_inbox(liker_user_id.as_uuid(), object, inbox)
+ .await
+ .map_err(|e| DomainError::Internal(e.to_string()))
+ }
+
+ async fn broadcast_undo_like(
+ &self,
+ liker_user_id: &UserId,
+ object_ap_id: &str,
+ author_inbox_url: &str,
+ ) -> Result<(), DomainError> {
+ let object = url::Url::parse(object_ap_id)
+ .map_err(|e| DomainError::Internal(e.to_string()))?;
+ let inbox = url::Url::parse(author_inbox_url)
+ .map_err(|e| DomainError::Internal(e.to_string()))?;
+ self.inner
+ .broadcast_undo_like_to_inbox(liker_user_id.as_uuid(), object, inbox)
+ .await
+ .map_err(|e| DomainError::Internal(e.to_string()))
+ }
+
+ async fn broadcast_actor_update(&self, user_id: &UserId) -> Result<(), DomainError> {
+ self.inner
+ .broadcast_actor_update(user_id.as_uuid())
+ .await
+ .map_err(|e| DomainError::Internal(e.to_string()))
+ }
+}
+
+// ── FederationSchedulerPort ───────────────────────────────────────────────────
+
+#[async_trait]
+impl FederationSchedulerPort for ApFederationAdapter {
+ async fn schedule_actor_posts_fetch(
+ &self,
+ actor_ap_url: &str,
+ outbox_url: &str,
+ ) -> Result<(), DomainError> {
+ let service = self.inner.clone();
+ let actor = actor_ap_url.to_string();
+ let outbox = outbox_url.to_string();
+ tokio::spawn(async move {
+ if let Err(e) = service.backfill_outbox(&outbox, &actor).await {
+ tracing::warn!(actor = %actor, error = %e, "posts backfill failed");
+ }
+ });
+ Ok(())
+ }
+
+ async fn schedule_connections_fetch(
+ &self,
+ actor_ap_url: &str,
+ collection_url: &str,
+ connection_type: &str,
+ page: u32,
+ ) -> Result<(), DomainError> {
+ if page != 1 {
+ return Ok(());
+ }
+ let actor = actor_ap_url.to_string();
+ let collection = collection_url.to_string();
+ let conn_type = connection_type.to_string();
+ let connections_repo = self.connections_repo.clone();
+ tokio::spawn(async move {
+ let client = match reqwest::Client::builder()
+ .timeout(std::time::Duration::from_secs(HTTP_FETCH_TIMEOUT_SECS))
+ .build()
+ {
+ Ok(c) => c,
+ Err(e) => {
+ tracing::warn!(error = %e, "connections fetch: failed to build client");
+ return;
+ }
+ };
+
+ let mut all_urls: Vec = Vec::new();
+ let mut current_url: Option = Some(collection.clone());
+ const MAX_ACTORS: usize = 500;
+
+ while let Some(url) = current_url.take() {
+ let val: serde_json::Value = match client
+ .get(&url)
+ .header("Accept", "application/activity+json, application/ld+json")
+ .send()
+ .await
+ {
+ Ok(r) => match r.json().await {
+ Ok(v) => v,
+ Err(e) => {
+ tracing::warn!(error = %e, url = %url, "connections: parse error");
+ break;
+ }
+ },
+ Err(e) => {
+ tracing::warn!(error = %e, url = %url, "connections: HTTP error");
+ break;
+ }
+ };
+
+ if val["type"].as_str() == Some("OrderedCollection") {
+ current_url = val["first"].as_str().map(|s| s.to_string());
+ continue;
+ }
+
+ let empty = vec![];
+ let items = val["orderedItems"].as_array().unwrap_or(&empty);
+ for item in items {
+ let actor_url =
+ item.as_str().or_else(|| item["id"].as_str()).unwrap_or("");
+ if !actor_url.is_empty() {
+ all_urls.push(actor_url.to_string());
+ }
+ }
+
+ if all_urls.len() >= MAX_ACTORS {
+ break;
+ }
+ current_url = val["next"].as_str().map(|s| s.to_string());
+ if current_url.is_some() {
+ tokio::time::sleep(std::time::Duration::from_millis(BATCH_FETCH_SLEEP_MS))
+ .await;
+ }
+ }
+
+ if all_urls.is_empty() {
+ tracing::debug!(
+ actor = %actor,
+ connection_type = %conn_type,
+ "connections: empty collection"
+ );
+ return;
+ }
+
+ const PAGE_SIZE: usize = 20;
+ for (idx, chunk) in all_urls.chunks(PAGE_SIZE).enumerate() {
+ let page_num = (idx + 1) as u32;
+ let resolved = resolve_actor_profiles_from_urls(chunk.to_vec()).await;
+ if let Err(e) = connections_repo
+ .upsert_connections(&actor, &conn_type, page_num, &resolved)
+ .await
+ {
+ tracing::warn!(error = %e, "connections: upsert failed");
+ }
+ }
+
+ tracing::debug!(
+ actor = %actor,
+ connection_type = %conn_type,
+ count = all_urls.len(),
+ "connections fetch complete"
+ );
+ });
+ Ok(())
+ }
+}
+
+// ── FederationLookupPort ──────────────────────────────────────────────────────
+
+#[async_trait]
+impl FederationLookupPort for ApFederationAdapter {
+ async fn lookup_actor(&self, handle: &str) -> Result {
+ let normalized = handle.trim_start_matches('@');
+ let at = normalized.rfind('@').ok_or_else(|| {
+ DomainError::InvalidInput("handle must be user@domain".into())
+ })?;
+ let (user, domain_str) = (&normalized[..at], &normalized[at + 1..]);
+
+ let wf_url = format!(
+ "https://{}/.well-known/webfinger?resource=acct:{}@{}",
+ domain_str, user, domain_str
+ );
+ let wf: serde_json::Value = reqwest::Client::new()
+ .get(&wf_url)
+ .header("Accept", "application/jrd+json, application/json")
+ .send()
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))?
+ .json()
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))?;
+
+ let self_href = wf["links"]
+ .as_array()
+ .and_then(|links| {
+ links.iter().find(|l| {
+ l["rel"].as_str() == Some("self")
+ && l["type"].as_str() == Some("application/activity+json")
+ })
+ })
+ .and_then(|l| l["href"].as_str())
+ .ok_or(DomainError::NotFound)?
+ .to_owned();
+
+ let actor_json: serde_json::Value = reqwest::Client::new()
+ .get(&self_href)
+ .header("Accept", "application/activity+json")
+ .send()
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))?
+ .json()
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))?;
+
+ let ap_url = actor_json["id"].as_str().unwrap_or(&self_href).to_string();
+ let preferred_username =
+ actor_json["preferredUsername"].as_str().unwrap_or("").to_string();
+ let domain_part = url::Url::parse(&ap_url)
+ .ok()
+ .and_then(|u| u.host_str().map(|s| s.to_string()))
+ .unwrap_or_default();
+ let full_handle = format!("{}@{}", preferred_username, domain_part);
+
+ Ok(DomainRemoteActor {
+ url: ap_url.clone(),
+ handle: full_handle,
+ display_name: actor_json["name"].as_str().map(|s| s.to_string()),
+ avatar_url: actor_json["icon"]["url"].as_str().map(|s| s.to_string()),
+ outbox_url: actor_json["outbox"].as_str().map(|s| s.to_string()),
+ last_fetched_at: chrono::Utc::now(),
+ bio: actor_json["summary"].as_str().map(|s| s.to_string()),
+ banner_url: actor_json["image"]["url"].as_str().map(|s| s.to_string()),
+ also_known_as: None,
+ followers_url: actor_json["followers"].as_str().map(|s| s.to_string()),
+ following_url: actor_json["following"].as_str().map(|s| s.to_string()),
+ attachment: vec![],
+ })
+ }
+
+ async fn actor_json(&self, user_id: &UserId) -> Result {
+ self.inner
+ .actor_json(&user_id.as_uuid().to_string())
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))
+ }
+
+ async fn followers_collection_json(
+ &self,
+ user_id: &UserId,
+ page: Option,
+ ) -> Result {
+ self.inner
+ .followers_collection_json(user_id.as_uuid(), page)
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))
+ }
+
+ async fn following_collection_json(
+ &self,
+ user_id: &UserId,
+ page: Option,
+ ) -> Result {
+ self.inner
+ .following_collection_json(user_id.as_uuid(), page)
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))
+ }
+}
+
+// ── FederationFetchPort ───────────────────────────────────────────────────────
+
+#[async_trait]
+impl FederationFetchPort for ApFederationAdapter {
+ async fn fetch_outbox_page(
+ &self,
+ outbox_url: &str,
+ page: u32,
+ ) -> Result, DomainError> {
+ use chrono::DateTime;
+
+ let client = reqwest::Client::new();
+ let base: serde_json::Value = client
+ .get(outbox_url)
+ .header("Accept", "application/activity+json, application/ld+json")
+ .send()
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))?
+ .json()
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))?;
+
+ let url = base["first"]
+ .as_str()
+ .map(|s| s.to_string())
+ .unwrap_or_else(|| format!("{}?page={}", outbox_url, page));
+
+ let resp: serde_json::Value = client
+ .get(&url)
+ .header("Accept", "application/activity+json, application/ld+json")
+ .send()
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))?
+ .json()
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))?;
+
+ let empty = vec![];
+ let items = resp["orderedItems"].as_array().unwrap_or(&empty);
+
+ let notes = items
+ .iter()
+ .filter_map(|item| {
+ let note = if item["type"].as_str() == Some("Create") {
+ &item["object"]
+ } else if item["type"].as_str() == Some("Note") {
+ item
+ } else {
+ return None;
+ };
+
+ let to = note["to"].as_array()?;
+ let is_public = to
+ .iter()
+ .any(|t| t.as_str() == Some("https://www.w3.org/ns/activitystreams#Public"));
+ if !is_public {
+ return None;
+ }
+
+ let published =
+ DateTime::parse_from_rfc3339(note["published"].as_str()?)
+ .ok()?
+ .with_timezone(&chrono::Utc);
+
+ let text = note["content"].as_str().unwrap_or("").to_string();
+ let has_attachments = note["attachment"]
+ .as_array()
+ .map(|a| !a.is_empty())
+ .unwrap_or(false);
+
+ let content = if has_attachments {
+ let notice =
+ "📎 Media attachment — not supported
";
+ if text.is_empty() {
+ notice.to_string()
+ } else {
+ format!("{text}{notice}")
+ }
+ } else {
+ text
+ };
+
+ Some(domain::models::remote_note::RemoteNote {
+ ap_id: note["id"].as_str()?.to_string(),
+ content,
+ published,
+ sensitive: note["sensitive"].as_bool().unwrap_or(false),
+ content_warning: note["summary"].as_str().map(|s| s.to_string()),
+ })
+ })
+ .collect();
+
+ Ok(notes)
+ }
+
+ async fn fetch_actor_urls_from_collection(
+ &self,
+ collection_url: &str,
+ ) -> Result, DomainError> {
+ let client = reqwest::Client::new();
+ let base: serde_json::Value = client
+ .get(collection_url)
+ .header("Accept", "application/activity+json, application/ld+json")
+ .send()
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))?
+ .json()
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))?;
+
+ let page = if base["orderedItems"].is_null() {
+ if let Some(first_url) = base["first"].as_str() {
+ client
+ .get(first_url)
+ .header("Accept", "application/activity+json, application/ld+json")
+ .send()
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))?
+ .json()
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))?
+ } else {
+ base
+ }
+ } else {
+ base
+ };
+
+ let empty = vec![];
+ let items = page["orderedItems"].as_array().unwrap_or(&empty);
+ Ok(items
+ .iter()
+ .filter_map(|v| v.as_str().map(|s| s.to_string()))
+ .collect())
+ }
+
+ async fn resolve_actor_profiles(
+ &self,
+ urls: Vec,
+ ) -> Vec {
+ resolve_actor_profiles_from_urls(urls).await
+ }
+}
+
+// ── FederationFollowPort ──────────────────────────────────────────────────────
+
+#[async_trait]
+impl FederationFollowPort for ApFederationAdapter {
+ async fn follow_remote(&self, local_user_id: &UserId, handle: &str) -> Result<(), DomainError> {
+ self.inner
+ .follow(local_user_id.as_uuid(), handle)
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))
+ }
+
+ async fn unfollow_remote(
+ &self,
+ local_user_id: &UserId,
+ handle: &str,
+ ) -> Result<(), DomainError> {
+ let actor_url = webfinger_resolve_actor_url(handle)
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))?;
+ self.inner
+ .unfollow(local_user_id.as_uuid(), &actor_url)
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))
+ }
+
+ async fn get_remote_following(
+ &self,
+ user_id: &UserId,
+ ) -> Result, DomainError> {
+ self.inner
+ .get_following(user_id.as_uuid())
+ .await
+ .map(|v| v.into_iter().map(k_ap_actor_to_domain).collect())
+ .map_err(|e| DomainError::ExternalService(e.to_string()))
+ }
+}
+
+// ── FederationFollowRequestPort ───────────────────────────────────────────────
+
+#[async_trait]
+impl FederationFollowRequestPort for ApFederationAdapter {
+ async fn get_pending_followers(
+ &self,
+ user_id: &UserId,
+ ) -> Result, DomainError> {
+ self.inner
+ .get_pending_followers(user_id.as_uuid())
+ .await
+ .map(|v| v.into_iter().map(k_ap_actor_to_domain).collect())
+ .map_err(|e| DomainError::ExternalService(e.to_string()))
+ }
+
+ async fn accept_follow_request(
+ &self,
+ user_id: &UserId,
+ actor_url: &str,
+ ) -> Result<(), DomainError> {
+ self.inner
+ .accept_follower(user_id.as_uuid(), actor_url)
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))
+ }
+
+ async fn reject_follow_request(
+ &self,
+ user_id: &UserId,
+ actor_url: &str,
+ ) -> Result<(), DomainError> {
+ self.inner
+ .reject_follower(user_id.as_uuid(), actor_url)
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))
+ }
+
+ async fn get_remote_followers(
+ &self,
+ user_id: &UserId,
+ ) -> Result, DomainError> {
+ self.inner
+ .get_accepted_followers(user_id.as_uuid())
+ .await
+ .map(|v| v.into_iter().map(k_ap_actor_to_domain).collect())
+ .map_err(|e| DomainError::ExternalService(e.to_string()))
+ }
+
+ async fn remove_remote_follower(
+ &self,
+ user_id: &UserId,
+ actor_url: &str,
+ ) -> Result<(), DomainError> {
+ self.inner
+ .remove_follower(user_id.as_uuid(), actor_url)
+ .await
+ .map_err(|e| DomainError::ExternalService(e.to_string()))
+ }
+}
+
+// FederationActionPort is a blanket supertrait; no explicit impl needed.
diff --git a/crates/adapters/postgres-federation/Cargo.toml b/crates/adapters/postgres-federation/Cargo.toml
index 0fc67e3..c8582b5 100644
--- a/crates/adapters/postgres-federation/Cargo.toml
+++ b/crates/adapters/postgres-federation/Cargo.toml
@@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
-k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.0" }
+k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.2" }
sqlx = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
diff --git a/crates/bootstrap/Cargo.toml b/crates/bootstrap/Cargo.toml
index b36542d..6e5e220 100644
--- a/crates/bootstrap/Cargo.toml
+++ b/crates/bootstrap/Cargo.toml
@@ -14,7 +14,7 @@ postgres = { workspace = true }
postgres-search = { workspace = true }
postgres-federation = { workspace = true }
activitypub = { workspace = true }
-k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.0" }
+k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.2" }
nats = { workspace = true }
event-transport = { workspace = true }
auth = { workspace = true }
diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs
index 7556bfb..a15883e 100644
--- a/crates/bootstrap/src/factory.rs
+++ b/crates/bootstrap/src/factory.rs
@@ -5,7 +5,7 @@ use async_trait::async_trait;
use sqlx::PgPool;
use std::sync::Arc;
-use activitypub::ThoughtsObjectHandler;
+use activitypub::{ApFederationAdapter, ThoughtsObjectHandler};
use k_ap::ActivityPubService;
use auth::ApiKeyServiceImpl;
use domain::{
@@ -27,7 +27,7 @@ use crate::config::Config;
/// Everything the binary needs to start serving.
pub struct Infrastructure {
pub state: AppState,
- pub ap_service: Arc,
+ pub ap_service: Arc,
}
struct NoOpEventPublisher;
@@ -72,7 +72,9 @@ pub async fn build(cfg: &Config) -> Infrastructure {
};
// 3. ActivityPub federation
- let ap_service = Arc::new(
+ let connections_repo =
+ Arc::new(PgRemoteActorConnectionRepository::new(pool.clone()));
+ let raw_ap_service = Arc::new(
ActivityPubService::builder(
Arc::new(PostgresFederationRepository::new(pool.clone())),
Arc::new(PostgresApUserRepository::new(
@@ -86,7 +88,6 @@ pub async fn build(cfg: &Config) -> Infrastructure {
Arc::new(postgres::tag::PgTagRepository::new(pool.clone())),
)),
cfg.base_url.clone(),
- Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())),
)
.allow_registration(cfg.allow_registration)
.software_name("thoughts")
@@ -95,6 +96,7 @@ pub async fn build(cfg: &Config) -> Infrastructure {
.await
.expect("Failed to build ActivityPubService"),
);
+ let ap_service = Arc::new(ApFederationAdapter::new(raw_ap_service, connections_repo));
// 4. Application state
let state = AppState {
diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml
index 3524676..4f04741 100644
--- a/crates/worker/Cargo.toml
+++ b/crates/worker/Cargo.toml
@@ -13,7 +13,7 @@ application = { workspace = true }
nats = { workspace = true }
event-transport = { workspace = true }
event-payload = { workspace = true }
-k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.0" }
+k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.2" }
activitypub = { workspace = true }
postgres = { workspace = true }
postgres-federation = { workspace = true }
diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs
index cc8022e..0604a04 100644
--- a/crates/worker/src/factory.rs
+++ b/crates/worker/src/factory.rs
@@ -3,7 +3,7 @@ use postgres::remote_actor_connections::PgRemoteActorConnectionRepository;
use sqlx::PgPool;
use std::sync::Arc;
-use activitypub::ThoughtsObjectHandler;
+use activitypub::{ApFederationAdapter, ThoughtsObjectHandler};
use activitypub::{ActivityPubRepository, OutboundFederationPort};
use k_ap::ActivityPubService;
use application::services::{FederationEventService, NotificationEventService};
@@ -39,7 +39,9 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
));
// ActivityPub service (for federation fan-out)
- let ap_service = Arc::new(
+ let connections_repo_worker =
+ Arc::new(PgRemoteActorConnectionRepository::new(pool.clone()));
+ let raw_ap_service = Arc::new(
ActivityPubService::builder(
Arc::new(PostgresFederationRepository::new(pool.clone())),
Arc::new(PostgresApUserRepository::new(
@@ -53,13 +55,13 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
Arc::new(postgres::tag::PgTagRepository::new(pool.clone())),
)),
base_url,
- Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())),
)
.software_name("thoughts")
.build()
.await
.expect("ActivityPubService build failed"),
);
+ let ap_service = Arc::new(ApFederationAdapter::new(raw_ap_service, connections_repo_worker));
let ap_outbound = ap_service.clone() as Arc;
let ap_repo_worker =
Arc::new(PgActivityPubRepository::new(pool.clone())) as Arc;