From 4ae3af8086728ab820758fd108c8d75eccc3ca0e Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 10:58:25 +0200 Subject: [PATCH] refactor(activitypub): ThoughtsObjectHandler uses ActivityPubRepository port, drops postgres dep --- crates/adapters/activitypub/Cargo.toml | 2 - crates/adapters/activitypub/src/handler.rs | 170 +++++++++------------ 2 files changed, 73 insertions(+), 99 deletions(-) diff --git a/crates/adapters/activitypub/Cargo.toml b/crates/adapters/activitypub/Cargo.toml index 94e3001..1feca10 100644 --- a/crates/adapters/activitypub/Cargo.toml +++ b/crates/adapters/activitypub/Cargo.toml @@ -7,8 +7,6 @@ edition = "2021" activitypub-base = { workspace = true } activitypub_federation = "0.7.0-beta.11" domain = { workspace = true } -postgres = { workspace = true } -sqlx = { workspace = true } url = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/crates/adapters/activitypub/src/handler.rs b/crates/adapters/activitypub/src/handler.rs index fc45c91..b88317b 100644 --- a/crates/adapters/activitypub/src/handler.rs +++ b/crates/adapters/activitypub/src/handler.rs @@ -1,137 +1,113 @@ +use std::sync::Arc; use anyhow::{anyhow, Result}; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use sqlx::PgPool; use url::Url; use activitypub_base::ApObjectHandler; +use domain::ports::ActivityPubRepository; +use domain::value_objects::UserId; use crate::note::ThoughtNote; use crate::urls::ThoughtsUrls; pub struct ThoughtsObjectHandler { - pool: PgPool, + repo: Arc, urls: ThoughtsUrls, } impl ThoughtsObjectHandler { - pub fn new(pool: PgPool, base_url: &str) -> Self { - Self { pool, urls: ThoughtsUrls::new(base_url) } + pub fn new(repo: Arc, base_url: &str) -> Self { + Self { repo, urls: ThoughtsUrls::new(base_url) } } } #[async_trait] impl ApObjectHandler for ThoughtsObjectHandler { - async fn get_local_objects_for_user(&self, user_id: uuid::Uuid) -> Result> { - #[derive(sqlx::FromRow)] - struct Row { id: uuid::Uuid, content: String, created_at: DateTime, in_reply_to_id: Option, content_warning: Option, sensitive: bool, username: String } - let rows = sqlx::query_as::<_, Row>( - "SELECT t.id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username - FROM thoughts t JOIN users u ON u.id=t.user_id - WHERE t.user_id=$1 AND t.local=true AND t.visibility='public'" - ).bind(user_id).fetch_all(&self.pool).await.map_err(|e| anyhow!(e))?; - - let mut result = Vec::new(); - for r in rows { - let note_url = self.urls.thought_url(r.id); - let actor_url = self.urls.user_url(&r.username); - let followers_url = self.urls.user_followers(&r.username); - let in_reply_to = r.in_reply_to_id.map(|id| self.urls.thought_url(id)); - let note = ThoughtNote::new_public(note_url.clone(), actor_url, r.content, r.created_at, in_reply_to, r.sensitive, r.content_warning, followers_url); - result.push((note_url, serde_json::to_value(¬e)?)); - } - Ok(result) + async fn get_local_objects_for_user( + &self, + user_id: uuid::Uuid, + ) -> Result> { + let uid = UserId::from_uuid(user_id); + let entries = self.repo.outbox_entries_for_actor(&uid).await + .map_err(|e| anyhow!("{e}"))?; + entries.into_iter().map(|e| { + let note_url = self.urls.thought_url(e.thought.id.as_uuid()); + let actor_url = self.urls.user_url(e.author_username.as_str()); + let followers = self.urls.user_followers(e.author_username.as_str()); + let in_reply_to = e.thought.in_reply_to_id.map(|id| self.urls.thought_url(id.as_uuid())); + let note = ThoughtNote::new_public( + note_url.clone(), actor_url, + e.thought.content.as_str().to_owned(), + e.thought.created_at, in_reply_to, + e.thought.sensitive, e.thought.content_warning, followers, + ); + Ok((note_url, serde_json::to_value(¬e)?)) + }).collect() } async fn get_local_objects_page( - &self, user_id: uuid::Uuid, before: Option>, limit: usize, + &self, + user_id: uuid::Uuid, + before: Option>, + limit: usize, ) -> Result)>> { - #[derive(sqlx::FromRow)] - struct Row { id: uuid::Uuid, content: String, created_at: DateTime, in_reply_to_id: Option, content_warning: Option, sensitive: bool, username: String } - let rows = if let Some(before) = before { - sqlx::query_as::<_, Row>( - "SELECT t.id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username - FROM thoughts t JOIN users u ON u.id=t.user_id - WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' AND t.created_at < $2 - ORDER BY t.created_at DESC LIMIT $3" - ).bind(user_id).bind(before).bind(limit as i64).fetch_all(&self.pool).await - } else { - sqlx::query_as::<_, Row>( - "SELECT t.id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username - FROM thoughts t JOIN users u ON u.id=t.user_id - WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' - ORDER BY t.created_at DESC LIMIT $2" - ).bind(user_id).bind(limit as i64).fetch_all(&self.pool).await - }.map_err(|e| anyhow!(e))?; - - let mut result = Vec::new(); - for r in rows { - let note_url = self.urls.thought_url(r.id); - let actor_url = self.urls.user_url(&r.username); - let followers_url = self.urls.user_followers(&r.username); - let in_reply_to = r.in_reply_to_id.map(|id| self.urls.thought_url(id)); - let note = ThoughtNote::new_public(note_url.clone(), actor_url, r.content.clone(), r.created_at, in_reply_to, r.sensitive, r.content_warning, followers_url); - result.push((note_url, serde_json::to_value(¬e)?, r.created_at)); - } - Ok(result) + let uid = UserId::from_uuid(user_id); + let entries = self.repo.outbox_page_for_actor(&uid, before, limit).await + .map_err(|e| anyhow!("{e}"))?; + entries.into_iter().map(|e| { + let created_at = e.thought.created_at; + let note_url = self.urls.thought_url(e.thought.id.as_uuid()); + let actor_url = self.urls.user_url(e.author_username.as_str()); + let followers = self.urls.user_followers(e.author_username.as_str()); + let in_reply_to = e.thought.in_reply_to_id.map(|id| self.urls.thought_url(id.as_uuid())); + let note = ThoughtNote::new_public( + note_url.clone(), actor_url, + e.thought.content.as_str().to_owned(), + created_at, in_reply_to, + e.thought.sensitive, e.thought.content_warning, followers, + ); + Ok((note_url, serde_json::to_value(¬e)?, created_at)) + }).collect() } - async fn on_create(&self, ap_id: &Url, actor_url: &Url, object: serde_json::Value) -> Result<()> { + async fn on_create( + &self, + ap_id: &Url, + actor_url: &Url, + object: serde_json::Value, + ) -> Result<()> { let note: ThoughtNote = serde_json::from_value(object)?; - let actor_url_str = actor_url.to_string(); - - // Find or create a remote user placeholder - let existing: Option = sqlx::query_scalar( - "SELECT id FROM users WHERE ap_id=$1" - ).bind(&actor_url_str).fetch_optional(&self.pool).await.map_err(|e| anyhow!(e))?; - - let user_id = match existing { - Some(id) => id, - None => { - let uid = uuid::Uuid::new_v4(); - let handle = actor_url.path().trim_start_matches('/').replace('/', "_"); - sqlx::query( - "INSERT INTO users(id,username,email,password_hash,local,ap_id,created_at,updated_at) - VALUES($1,$2,$3,'',false,$4,NOW(),NOW()) ON CONFLICT DO NOTHING" - ).bind(uid).bind(&handle).bind(format!("{}@remote", uid)).bind(&actor_url_str) - .execute(&self.pool).await.map_err(|e| anyhow!(e))?; - uid - } - }; - - let thought_id = uuid::Uuid::new_v4(); - let content: String = note.content.chars().take(500).collect(); - sqlx::query( - "INSERT INTO thoughts(id,user_id,content,ap_id,visibility,sensitive,local,content_warning,created_at) - VALUES($1,$2,$3,$4,'public',$5,false,$6,$7) ON CONFLICT(ap_id) DO NOTHING" - ).bind(thought_id).bind(user_id).bind(&content).bind(ap_id.as_str()) - .bind(note.sensitive).bind(note.summary).bind(note.published) - .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + let author_id = self.repo.intern_remote_actor(actor_url).await + .map_err(|e| anyhow!("{e}"))?; + self.repo.accept_note( + ap_id, &author_id, + ¬e.content, + note.published, + note.sensitive, + note.summary, + ).await.map_err(|e| anyhow!("{e}")) } - async fn on_update(&self, ap_id: &Url, _actor_url: &Url, object: serde_json::Value) -> Result<()> { + async fn on_update( + &self, + ap_id: &Url, + _actor_url: &Url, + object: serde_json::Value, + ) -> Result<()> { let note: ThoughtNote = serde_json::from_value(object)?; - let content: String = note.content.chars().take(500).collect(); - sqlx::query("UPDATE thoughts SET content=$2, updated_at=NOW() WHERE ap_id=$1") - .bind(ap_id.as_str()).bind(&content) - .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + self.repo.apply_note_update(ap_id, ¬e.content).await + .map_err(|e| anyhow!("{e}")) } async fn on_delete(&self, ap_id: &Url, _actor_url: &Url) -> Result<()> { - sqlx::query("DELETE FROM thoughts WHERE ap_id=$1 AND local=false") - .bind(ap_id.as_str()) - .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + self.repo.retract_note(ap_id).await.map_err(|e| anyhow!("{e}")) } async fn on_actor_removed(&self, actor_url: &Url) -> Result<()> { - sqlx::query( - "DELETE FROM thoughts WHERE local=false AND user_id=(SELECT id FROM users WHERE ap_id=$1)" - ).bind(actor_url.as_str()) - .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + self.repo.retract_actor_notes(actor_url).await.map_err(|e| anyhow!("{e}")) } async fn count_local_posts(&self) -> Result { - let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM thoughts WHERE local=true") - .fetch_one(&self.pool).await.map_err(|e| anyhow!(e))?; - Ok(n as u64) + self.repo.count_local_notes().await.map_err(|e| anyhow!("{e}")) } }