From e0b0a71f1dfaf465090c9171d3b56c38fdb1b39f Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 10:55:58 +0200 Subject: [PATCH] feat(postgres): PgActivityPubRepository implementing ActivityPubRepository port --- crates/adapters/postgres/Cargo.toml | 1 + crates/adapters/postgres/src/activitypub.rs | 267 ++++++++++++++++++++ crates/adapters/postgres/src/lib.rs | 1 + 3 files changed, 269 insertions(+) create mode 100644 crates/adapters/postgres/src/activitypub.rs diff --git a/crates/adapters/postgres/Cargo.toml b/crates/adapters/postgres/Cargo.toml index e60faf0..0e20014 100644 --- a/crates/adapters/postgres/Cargo.toml +++ b/crates/adapters/postgres/Cargo.toml @@ -11,6 +11,7 @@ chrono = { workspace = true } async-trait = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } +url = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["full"] } diff --git a/crates/adapters/postgres/src/activitypub.rs b/crates/adapters/postgres/src/activitypub.rs new file mode 100644 index 0000000..9d5e7d3 --- /dev/null +++ b/crates/adapters/postgres/src/activitypub.rs @@ -0,0 +1,267 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use sqlx::PgPool; +use url::Url; + +use domain::{ + errors::DomainError, + models::thought::{Thought, Visibility}, + ports::{ActivityPubRepository, OutboxEntry}, + value_objects::{Content, ThoughtId, UserId, Username}, +}; + +pub struct PgActivityPubRepository { + pool: PgPool, +} + +impl PgActivityPubRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl ActivityPubRepository for PgActivityPubRepository { + async fn outbox_entries_for_actor(&self, user_id: &UserId) -> Result, DomainError> { + #[derive(sqlx::FromRow)] + struct Row { + id: uuid::Uuid, + user_id: uuid::Uuid, + content: String, + created_at: DateTime, + in_reply_to_id: Option, + content_warning: Option, + sensitive: bool, + username: String, + updated_at: Option>, + } + sqlx::query_as::<_, Row>( + "SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at + FROM thoughts t JOIN users u ON u.id=t.user_id + WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' + ORDER BY t.created_at DESC", + ) + .bind(user_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + .map(|rows| { + rows.into_iter() + .map(|r| OutboxEntry { + thought: Thought { + id: ThoughtId::from_uuid(r.id), + user_id: UserId::from_uuid(r.user_id), + content: Content::new_remote(r.content), + in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid), + in_reply_to_url: None, + ap_id: None, + visibility: Visibility::Public, + content_warning: r.content_warning, + sensitive: r.sensitive, + local: true, + created_at: r.created_at, + updated_at: r.updated_at, + }, + author_username: Username::from_trusted(r.username), + }) + .collect() + }) + } + + async fn outbox_page_for_actor( + &self, + user_id: &UserId, + before: Option>, + limit: usize, + ) -> Result, DomainError> { + #[derive(sqlx::FromRow)] + struct Row { + id: uuid::Uuid, + user_id: uuid::Uuid, + content: String, + created_at: DateTime, + in_reply_to_id: Option, + content_warning: Option, + sensitive: bool, + username: String, + updated_at: Option>, + } + let rows = if let Some(before) = before { + sqlx::query_as::<_, Row>( + "SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at + FROM thoughts t JOIN users u ON u.id=t.user_id + WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' AND t.created_at < $2 + ORDER BY t.created_at DESC LIMIT $3", + ) + .bind(user_id.as_uuid()) + .bind(before) + .bind(limit as i64) + .fetch_all(&self.pool) + .await + } else { + sqlx::query_as::<_, Row>( + "SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at + FROM thoughts t JOIN users u ON u.id=t.user_id + WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' + ORDER BY t.created_at DESC LIMIT $2", + ) + .bind(user_id.as_uuid()) + .bind(limit as i64) + .fetch_all(&self.pool) + .await + } + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows + .into_iter() + .map(|r| OutboxEntry { + thought: Thought { + id: ThoughtId::from_uuid(r.id), + user_id: UserId::from_uuid(r.user_id), + content: Content::new_remote(r.content), + in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid), + in_reply_to_url: None, + ap_id: None, + visibility: Visibility::Public, + content_warning: r.content_warning, + sensitive: r.sensitive, + local: true, + created_at: r.created_at, + updated_at: r.updated_at, + }, + author_username: Username::from_trusted(r.username), + }) + .collect()) + } + + async fn find_remote_actor_id(&self, actor_ap_url: &Url) -> Result, DomainError> { + sqlx::query_scalar::<_, uuid::Uuid>("SELECT id FROM users WHERE ap_id=$1") + .bind(actor_ap_url.as_str()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + .map(|o| o.map(UserId::from_uuid)) + } + + async fn intern_remote_actor(&self, actor_ap_url: &Url) -> Result { + if let Some(id) = self.find_remote_actor_id(actor_ap_url).await? { + return Ok(id); + } + let new_id = uuid::Uuid::new_v4(); + let handle = actor_ap_url.path().trim_start_matches('/').replace('/', "_"); + sqlx::query( + "INSERT INTO users(id,username,email,password_hash,local,ap_id,created_at,updated_at) + VALUES($1,$2,$3,'',false,$4,NOW(),NOW()) ON CONFLICT(ap_id) DO NOTHING", + ) + .bind(new_id) + .bind(&handle) + .bind(format!("{}@remote", new_id)) + .bind(actor_ap_url.as_str()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + // Re-fetch to get whichever id won the race + self.find_remote_actor_id(actor_ap_url) + .await? + .ok_or_else(|| DomainError::Internal("intern_remote_actor: insert succeeded but row not found".into())) + } + + async fn accept_note( + &self, + ap_id: &Url, + author_id: &UserId, + content: &str, + published: DateTime, + sensitive: bool, + content_warning: Option, + ) -> Result<(), DomainError> { + let capped: String = content.chars().take(500).collect(); + sqlx::query( + "INSERT INTO thoughts(id,user_id,content,ap_id,visibility,sensitive,local,content_warning,created_at) + VALUES($1,$2,$3,$4,'public',$5,false,$6,$7) ON CONFLICT(ap_id) DO NOTHING", + ) + .bind(uuid::Uuid::new_v4()) + .bind(author_id.as_uuid()) + .bind(&capped) + .bind(ap_id.as_str()) + .bind(sensitive) + .bind(content_warning) + .bind(published) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + .map(|_| ()) + } + + async fn apply_note_update(&self, ap_id: &Url, new_content: &str) -> Result<(), DomainError> { + let capped: String = new_content.chars().take(500).collect(); + sqlx::query("UPDATE thoughts SET content=$2,updated_at=NOW() WHERE ap_id=$1 AND local=false") + .bind(ap_id.as_str()) + .bind(&capped) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + .map(|_| ()) + } + + async fn retract_note(&self, ap_id: &Url) -> Result<(), DomainError> { + sqlx::query("DELETE FROM thoughts WHERE ap_id=$1 AND local=false") + .bind(ap_id.as_str()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + .map(|_| ()) + } + + async fn retract_actor_notes(&self, actor_ap_url: &Url) -> Result<(), DomainError> { + sqlx::query( + "DELETE FROM thoughts WHERE local=false AND user_id=(SELECT id FROM users WHERE ap_id=$1)", + ) + .bind(actor_ap_url.as_str()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + .map(|_| ()) + } + + async fn count_local_notes(&self) -> Result { + let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM thoughts WHERE local=true") + .fetch_one(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(n as u64) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use domain::ports::ActivityPubRepository; + + #[sqlx::test(migrations = "./migrations")] + async fn intern_remote_actor_is_idempotent(pool: sqlx::PgPool) { + let repo = PgActivityPubRepository::new(pool); + let url = url::Url::parse("https://mastodon.social/users/alice").unwrap(); + let id1 = repo.intern_remote_actor(&url).await.unwrap(); + let id2 = repo.intern_remote_actor(&url).await.unwrap(); + assert_eq!(id1, id2); + } + + #[sqlx::test(migrations = "./migrations")] + async fn accept_and_retract_note(pool: sqlx::PgPool) { + let repo = PgActivityPubRepository::new(pool); + let actor_url = url::Url::parse("https://remote.example/users/bob").unwrap(); + let ap_id = url::Url::parse("https://remote.example/notes/1").unwrap(); + let author = repo.intern_remote_actor(&actor_url).await.unwrap(); + repo.accept_note(&ap_id, &author, "hello from remote", chrono::Utc::now(), false, None) + .await + .unwrap(); + repo.retract_note(&ap_id).await.unwrap(); + } + + #[sqlx::test(migrations = "./migrations")] + async fn count_local_notes_excludes_remote(pool: sqlx::PgPool) { + let repo = PgActivityPubRepository::new(pool); + assert_eq!(repo.count_local_notes().await.unwrap(), 0); + } +} diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index 0befdcd..0c479d9 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -1,3 +1,4 @@ +pub mod activitypub; pub mod api_key; pub mod block; pub mod boost;