From 2080fec347eb419dec4e874eecf817d202d09060 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 10:23:35 +0200 Subject: [PATCH] feat(activitypub): ThoughtNote AP object and ThoughtsObjectHandler --- crates/adapters/activitypub/Cargo.toml | 15 +++ crates/adapters/activitypub/src/handler.rs | 137 +++++++++++++++++++++ crates/adapters/activitypub/src/lib.rs | 7 ++ crates/adapters/activitypub/src/note.rs | 62 ++++++++++ crates/adapters/activitypub/src/urls.rs | 49 ++++++++ 5 files changed, 270 insertions(+) create mode 100644 crates/adapters/activitypub/src/handler.rs create mode 100644 crates/adapters/activitypub/src/note.rs create mode 100644 crates/adapters/activitypub/src/urls.rs diff --git a/crates/adapters/activitypub/Cargo.toml b/crates/adapters/activitypub/Cargo.toml index 5928d2e..94e3001 100644 --- a/crates/adapters/activitypub/Cargo.toml +++ b/crates/adapters/activitypub/Cargo.toml @@ -2,3 +2,18 @@ name = "activitypub" version = "0.1.0" edition = "2021" + +[dependencies] +activitypub-base = { workspace = true } +activitypub_federation = "0.7.0-beta.11" +domain = { workspace = true } +postgres = { workspace = true } +sqlx = { workspace = true } +url = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +anyhow = { workspace = true } +chrono = { workspace = true } +uuid = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } diff --git a/crates/adapters/activitypub/src/handler.rs b/crates/adapters/activitypub/src/handler.rs new file mode 100644 index 0000000..fc45c91 --- /dev/null +++ b/crates/adapters/activitypub/src/handler.rs @@ -0,0 +1,137 @@ +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use sqlx::PgPool; +use url::Url; + +use activitypub_base::ApObjectHandler; +use crate::note::ThoughtNote; +use crate::urls::ThoughtsUrls; + +pub struct ThoughtsObjectHandler { + pool: PgPool, + urls: ThoughtsUrls, +} + +impl ThoughtsObjectHandler { + pub fn new(pool: PgPool, base_url: &str) -> Self { + Self { pool, urls: ThoughtsUrls::new(base_url) } + } +} + +#[async_trait] +impl ApObjectHandler for ThoughtsObjectHandler { + async fn get_local_objects_for_user(&self, user_id: uuid::Uuid) -> Result> { + #[derive(sqlx::FromRow)] + struct Row { id: uuid::Uuid, content: String, created_at: DateTime, in_reply_to_id: Option, content_warning: Option, sensitive: bool, username: String } + let rows = sqlx::query_as::<_, Row>( + "SELECT t.id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username + FROM thoughts t JOIN users u ON u.id=t.user_id + WHERE t.user_id=$1 AND t.local=true AND t.visibility='public'" + ).bind(user_id).fetch_all(&self.pool).await.map_err(|e| anyhow!(e))?; + + let mut result = Vec::new(); + for r in rows { + let note_url = self.urls.thought_url(r.id); + let actor_url = self.urls.user_url(&r.username); + let followers_url = self.urls.user_followers(&r.username); + let in_reply_to = r.in_reply_to_id.map(|id| self.urls.thought_url(id)); + let note = ThoughtNote::new_public(note_url.clone(), actor_url, r.content, r.created_at, in_reply_to, r.sensitive, r.content_warning, followers_url); + result.push((note_url, serde_json::to_value(¬e)?)); + } + Ok(result) + } + + async fn get_local_objects_page( + &self, user_id: uuid::Uuid, before: Option>, limit: usize, + ) -> Result)>> { + #[derive(sqlx::FromRow)] + struct Row { id: uuid::Uuid, content: String, created_at: DateTime, in_reply_to_id: Option, content_warning: Option, sensitive: bool, username: String } + let rows = if let Some(before) = before { + sqlx::query_as::<_, Row>( + "SELECT t.id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username + FROM thoughts t JOIN users u ON u.id=t.user_id + WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' AND t.created_at < $2 + ORDER BY t.created_at DESC LIMIT $3" + ).bind(user_id).bind(before).bind(limit as i64).fetch_all(&self.pool).await + } else { + sqlx::query_as::<_, Row>( + "SELECT t.id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username + FROM thoughts t JOIN users u ON u.id=t.user_id + WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' + ORDER BY t.created_at DESC LIMIT $2" + ).bind(user_id).bind(limit as i64).fetch_all(&self.pool).await + }.map_err(|e| anyhow!(e))?; + + let mut result = Vec::new(); + for r in rows { + let note_url = self.urls.thought_url(r.id); + let actor_url = self.urls.user_url(&r.username); + let followers_url = self.urls.user_followers(&r.username); + let in_reply_to = r.in_reply_to_id.map(|id| self.urls.thought_url(id)); + let note = ThoughtNote::new_public(note_url.clone(), actor_url, r.content.clone(), r.created_at, in_reply_to, r.sensitive, r.content_warning, followers_url); + result.push((note_url, serde_json::to_value(¬e)?, r.created_at)); + } + Ok(result) + } + + async fn on_create(&self, ap_id: &Url, actor_url: &Url, object: serde_json::Value) -> Result<()> { + let note: ThoughtNote = serde_json::from_value(object)?; + let actor_url_str = actor_url.to_string(); + + // Find or create a remote user placeholder + let existing: Option = sqlx::query_scalar( + "SELECT id FROM users WHERE ap_id=$1" + ).bind(&actor_url_str).fetch_optional(&self.pool).await.map_err(|e| anyhow!(e))?; + + let user_id = match existing { + Some(id) => id, + None => { + let uid = uuid::Uuid::new_v4(); + let handle = actor_url.path().trim_start_matches('/').replace('/', "_"); + sqlx::query( + "INSERT INTO users(id,username,email,password_hash,local,ap_id,created_at,updated_at) + VALUES($1,$2,$3,'',false,$4,NOW(),NOW()) ON CONFLICT DO NOTHING" + ).bind(uid).bind(&handle).bind(format!("{}@remote", uid)).bind(&actor_url_str) + .execute(&self.pool).await.map_err(|e| anyhow!(e))?; + uid + } + }; + + let thought_id = uuid::Uuid::new_v4(); + let content: String = note.content.chars().take(500).collect(); + sqlx::query( + "INSERT INTO thoughts(id,user_id,content,ap_id,visibility,sensitive,local,content_warning,created_at) + VALUES($1,$2,$3,$4,'public',$5,false,$6,$7) ON CONFLICT(ap_id) DO NOTHING" + ).bind(thought_id).bind(user_id).bind(&content).bind(ap_id.as_str()) + .bind(note.sensitive).bind(note.summary).bind(note.published) + .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn on_update(&self, ap_id: &Url, _actor_url: &Url, object: serde_json::Value) -> Result<()> { + let note: ThoughtNote = serde_json::from_value(object)?; + let content: String = note.content.chars().take(500).collect(); + sqlx::query("UPDATE thoughts SET content=$2, updated_at=NOW() WHERE ap_id=$1") + .bind(ap_id.as_str()).bind(&content) + .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn on_delete(&self, ap_id: &Url, _actor_url: &Url) -> Result<()> { + sqlx::query("DELETE FROM thoughts WHERE ap_id=$1 AND local=false") + .bind(ap_id.as_str()) + .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn on_actor_removed(&self, actor_url: &Url) -> Result<()> { + sqlx::query( + "DELETE FROM thoughts WHERE local=false AND user_id=(SELECT id FROM users WHERE ap_id=$1)" + ).bind(actor_url.as_str()) + .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn count_local_posts(&self) -> Result { + let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM thoughts WHERE local=true") + .fetch_one(&self.pool).await.map_err(|e| anyhow!(e))?; + Ok(n as u64) + } +} diff --git a/crates/adapters/activitypub/src/lib.rs b/crates/adapters/activitypub/src/lib.rs index e69de29..5f8d6bd 100644 --- a/crates/adapters/activitypub/src/lib.rs +++ b/crates/adapters/activitypub/src/lib.rs @@ -0,0 +1,7 @@ +pub mod handler; +pub mod note; +pub mod urls; + +pub use handler::ThoughtsObjectHandler; +pub use note::ThoughtNote; +pub use urls::ThoughtsUrls; diff --git a/crates/adapters/activitypub/src/note.rs b/crates/adapters/activitypub/src/note.rs new file mode 100644 index 0000000..1cbaa65 --- /dev/null +++ b/crates/adapters/activitypub/src/note.rs @@ -0,0 +1,62 @@ +use activitypub_base::AS_PUBLIC; +use activitypub_federation::kinds::object::NoteType; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use url::Url; + +/// AP Note representing a Thought. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ThoughtNote { + #[serde(rename = "type")] + pub kind: NoteType, + pub id: Url, + pub attributed_to: Url, + pub content: String, + pub published: DateTime, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub to: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub cc: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub in_reply_to: Option, + pub sensitive: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub summary: Option, +} + +impl ThoughtNote { + pub fn new_public( + id: Url, actor_url: Url, content: String, published: DateTime, + in_reply_to: Option, sensitive: bool, summary: Option, + followers_url: Url, + ) -> Self { + Self { + kind: Default::default(), + id, attributed_to: actor_url, content, published, + to: vec![AS_PUBLIC.to_string()], + cc: vec![followers_url.to_string()], + in_reply_to, sensitive, summary, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn note_serializes_with_public_audience() { + let note = ThoughtNote::new_public( + "https://example.com/thoughts/1".parse().unwrap(), + "https://example.com/users/alice".parse().unwrap(), + "Hello world".to_string(), + chrono::Utc::now(), + None, false, None, + "https://example.com/users/alice/followers".parse().unwrap(), + ); + let json = serde_json::to_string(¬e).unwrap(); + assert!(json.contains(AS_PUBLIC)); + assert!(json.contains("Hello world")); + } +} diff --git a/crates/adapters/activitypub/src/urls.rs b/crates/adapters/activitypub/src/urls.rs new file mode 100644 index 0000000..5f7bf82 --- /dev/null +++ b/crates/adapters/activitypub/src/urls.rs @@ -0,0 +1,49 @@ +use url::Url; + +pub struct ThoughtsUrls { + pub base_url: String, +} + +impl ThoughtsUrls { + pub fn new(base_url: &str) -> Self { + Self { base_url: base_url.trim_end_matches('/').to_string() } + } + + pub fn user_url(&self, username: &str) -> Url { + Url::parse(&format!("{}/users/{}", self.base_url, username)).expect("valid URL") + } + + pub fn thought_url(&self, thought_id: uuid::Uuid) -> Url { + Url::parse(&format!("{}/thoughts/{}", self.base_url, thought_id)).expect("valid URL") + } + + pub fn user_inbox(&self, username: &str) -> Url { + Url::parse(&format!("{}/users/{}/inbox", self.base_url, username)).expect("valid URL") + } + + pub fn user_outbox(&self, username: &str) -> Url { + Url::parse(&format!("{}/users/{}/outbox", self.base_url, username)).expect("valid URL") + } + + pub fn user_followers(&self, username: &str) -> Url { + Url::parse(&format!("{}/users/{}/followers", self.base_url, username)).expect("valid URL") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn user_url_format() { + let urls = ThoughtsUrls::new("https://example.com"); + assert_eq!(urls.user_url("alice").as_str(), "https://example.com/users/alice"); + } + + #[test] + fn thought_url_format() { + let urls = ThoughtsUrls::new("https://example.com"); + let id = uuid::Uuid::nil(); + assert!(urls.thought_url(id).as_str().starts_with("https://example.com/thoughts/")); + } +}