use async_trait::async_trait; use chrono::{DateTime, Utc}; use sqlx::PgPool; use url::Url; use domain::{ errors::DomainError, models::thought::{Thought, Visibility}, ports::{ActivityPubRepository, OutboxEntry}, value_objects::{Content, ThoughtId, UserId, Username}, }; pub struct PgActivityPubRepository { pool: PgPool, } impl PgActivityPubRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } #[async_trait] impl ActivityPubRepository for PgActivityPubRepository { async fn outbox_entries_for_actor(&self, user_id: &UserId) -> Result, DomainError> { #[derive(sqlx::FromRow)] struct Row { id: uuid::Uuid, user_id: uuid::Uuid, content: String, created_at: DateTime, in_reply_to_id: Option, content_warning: Option, sensitive: bool, username: String, updated_at: Option>, } sqlx::query_as::<_, Row>( "SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at FROM thoughts t JOIN users u ON u.id=t.user_id WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' ORDER BY t.created_at DESC", ) .bind(user_id.as_uuid()) .fetch_all(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string())) .map(|rows| { rows.into_iter() .map(|r| OutboxEntry { thought: Thought { id: ThoughtId::from_uuid(r.id), user_id: UserId::from_uuid(r.user_id), content: Content::new_remote(r.content), in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid), in_reply_to_url: None, ap_id: None, visibility: Visibility::Public, content_warning: r.content_warning, sensitive: r.sensitive, local: true, created_at: r.created_at, updated_at: r.updated_at, }, author_username: Username::from_trusted(r.username), }) .collect() }) } async fn outbox_page_for_actor( &self, user_id: &UserId, before: Option>, limit: usize, ) -> Result, DomainError> { #[derive(sqlx::FromRow)] struct Row { id: uuid::Uuid, user_id: uuid::Uuid, content: String, created_at: DateTime, in_reply_to_id: Option, content_warning: Option, sensitive: bool, username: String, updated_at: Option>, } let rows = if let Some(before) = before { sqlx::query_as::<_, Row>( "SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at FROM thoughts t JOIN users u ON u.id=t.user_id WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' AND t.created_at < $2 ORDER BY t.created_at DESC LIMIT $3", ) .bind(user_id.as_uuid()) .bind(before) .bind(limit as i64) .fetch_all(&self.pool) .await } else { sqlx::query_as::<_, Row>( "SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at FROM thoughts t JOIN users u ON u.id=t.user_id WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' ORDER BY t.created_at DESC LIMIT $2", ) .bind(user_id.as_uuid()) .bind(limit as i64) .fetch_all(&self.pool) .await } .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(rows .into_iter() .map(|r| OutboxEntry { thought: Thought { id: ThoughtId::from_uuid(r.id), user_id: UserId::from_uuid(r.user_id), content: Content::new_remote(r.content), in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid), in_reply_to_url: None, ap_id: None, visibility: Visibility::Public, content_warning: r.content_warning, sensitive: r.sensitive, local: true, created_at: r.created_at, updated_at: r.updated_at, }, author_username: Username::from_trusted(r.username), }) .collect()) } async fn find_remote_actor_id(&self, actor_ap_url: &Url) -> Result, DomainError> { sqlx::query_scalar::<_, uuid::Uuid>("SELECT id FROM users WHERE ap_id=$1") .bind(actor_ap_url.as_str()) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string())) .map(|o| o.map(UserId::from_uuid)) } async fn intern_remote_actor(&self, actor_ap_url: &Url) -> Result { if let Some(id) = self.find_remote_actor_id(actor_ap_url).await? { return Ok(id); } let new_id = uuid::Uuid::new_v4(); let handle = actor_ap_url.path().trim_start_matches('/').replace('/', "_"); sqlx::query( "INSERT INTO users(id,username,email,password_hash,local,ap_id,created_at,updated_at) VALUES($1,$2,$3,'',false,$4,NOW(),NOW()) ON CONFLICT(ap_id) DO NOTHING", ) .bind(new_id) .bind(&handle) .bind(format!("{}@remote", new_id)) .bind(actor_ap_url.as_str()) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; // Re-fetch to get whichever id won the race self.find_remote_actor_id(actor_ap_url) .await? .ok_or_else(|| DomainError::Internal("intern_remote_actor: insert succeeded but row not found".into())) } async fn accept_note( &self, ap_id: &Url, author_id: &UserId, content: &str, published: DateTime, sensitive: bool, content_warning: Option, ) -> Result<(), DomainError> { let capped: String = content.chars().take(500).collect(); sqlx::query( "INSERT INTO thoughts(id,user_id,content,ap_id,visibility,sensitive,local,content_warning,created_at) VALUES($1,$2,$3,$4,'public',$5,false,$6,$7) ON CONFLICT(ap_id) DO NOTHING", ) .bind(uuid::Uuid::new_v4()) .bind(author_id.as_uuid()) .bind(&capped) .bind(ap_id.as_str()) .bind(sensitive) .bind(content_warning) .bind(published) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string())) .map(|_| ()) } async fn apply_note_update(&self, ap_id: &Url, new_content: &str) -> Result<(), DomainError> { let capped: String = new_content.chars().take(500).collect(); sqlx::query("UPDATE thoughts SET content=$2,updated_at=NOW() WHERE ap_id=$1 AND local=false") .bind(ap_id.as_str()) .bind(&capped) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string())) .map(|_| ()) } async fn retract_note(&self, ap_id: &Url) -> Result<(), DomainError> { sqlx::query("DELETE FROM thoughts WHERE ap_id=$1 AND local=false") .bind(ap_id.as_str()) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string())) .map(|_| ()) } async fn retract_actor_notes(&self, actor_ap_url: &Url) -> Result<(), DomainError> { sqlx::query( "DELETE FROM thoughts WHERE local=false AND user_id=(SELECT id FROM users WHERE ap_id=$1)", ) .bind(actor_ap_url.as_str()) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string())) .map(|_| ()) } async fn count_local_notes(&self) -> Result { let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM thoughts WHERE local=true") .fetch_one(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(n as u64) } } #[cfg(test)] mod tests { use super::*; use domain::ports::ActivityPubRepository; #[sqlx::test(migrations = "./migrations")] async fn intern_remote_actor_is_idempotent(pool: sqlx::PgPool) { let repo = PgActivityPubRepository::new(pool); let url = url::Url::parse("https://mastodon.social/users/alice").unwrap(); let id1 = repo.intern_remote_actor(&url).await.unwrap(); let id2 = repo.intern_remote_actor(&url).await.unwrap(); assert_eq!(id1, id2); } #[sqlx::test(migrations = "./migrations")] async fn accept_and_retract_note(pool: sqlx::PgPool) { let repo = PgActivityPubRepository::new(pool); let actor_url = url::Url::parse("https://remote.example/users/bob").unwrap(); let ap_id = url::Url::parse("https://remote.example/notes/1").unwrap(); let author = repo.intern_remote_actor(&actor_url).await.unwrap(); repo.accept_note(&ap_id, &author, "hello from remote", chrono::Utc::now(), false, None) .await .unwrap(); repo.retract_note(&ap_id).await.unwrap(); } #[sqlx::test(migrations = "./migrations")] async fn count_local_notes_excludes_remote(pool: sqlx::PgPool) { let repo = PgActivityPubRepository::new(pool); assert_eq!(repo.count_local_notes().await.unwrap(), 0); } }