use anyhow::Result; use async_trait::async_trait; use chrono::{DateTime, Utc}; use sqlx::PgPool; trait IntoAnyhow { fn into_anyhow(self) -> Result; } impl IntoAnyhow for std::result::Result { fn into_anyhow(self) -> Result { self.map_err(|e| anyhow::anyhow!(e)) } } use k_ap::{ ActivityRepository, ActorRepository, ApActorType, ApProfileField, ApUser, ApUserRepository, BlockedDomain, BlocklistRepository, FollowRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, }; // ── PostgresFederationRepository ───────────────────────────────────────────── pub struct PgFederationRepository { pool: PgPool, } impl PgFederationRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } fn status_str(s: &FollowerStatus) -> &'static str { match s { FollowerStatus::Pending => "pending", FollowerStatus::Accepted => "accepted", FollowerStatus::Rejected => "rejected", } } fn str_status(s: &str) -> FollowerStatus { match s { "accepted" => FollowerStatus::Accepted, "rejected" => FollowerStatus::Rejected, _ => FollowerStatus::Pending, } } #[derive(sqlx::FromRow)] struct RemoteActorRow { url: String, handle: String, inbox_url: String, shared_inbox_url: Option, display_name: Option, avatar_url: Option, outbox_url: Option, bio: Option, banner_url: Option, followers_url: Option, following_url: Option, also_known_as: Option>, } fn map_remote_actor(r: RemoteActorRow) -> RemoteActor { RemoteActor { url: r.url, handle: r.handle, inbox_url: r.inbox_url, shared_inbox_url: r.shared_inbox_url, display_name: r.display_name, avatar_url: r.avatar_url, outbox_url: r.outbox_url, bio: r.bio, banner_url: r.banner_url, followers_url: r.followers_url, following_url: r.following_url, also_known_as: r.also_known_as.unwrap_or_default(), } } // ── ActivityRepository ──────────────────────────────────────────────────────── #[async_trait] impl ActivityRepository for PgFederationRepository { async fn is_activity_processed(&self, activity_id: &str) -> Result { let n: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM federation_processed_activities WHERE activity_id=$1", ) .bind(activity_id) .fetch_one(&self.pool) .await .into_anyhow()?; Ok(n > 0) } async fn mark_activity_processed(&self, activity_id: &str) -> Result<()> { sqlx::query( "INSERT INTO federation_processed_activities(activity_id) VALUES($1) ON CONFLICT DO NOTHING", ) .bind(activity_id) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } } // ── FollowRepository ────────────────────────────────────────────────────────── #[async_trait] impl FollowRepository for PgFederationRepository { async fn add_follower( &self, local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowerStatus, follow_activity_id: &str, ) -> Result<()> { sqlx::query( "INSERT INTO federation_followers(local_user_id,remote_actor_url,status,follow_activity_id) VALUES($1,$2,$3,$4) ON CONFLICT(local_user_id,remote_actor_url) DO UPDATE SET status=EXCLUDED.status, follow_activity_id=EXCLUDED.follow_activity_id", ) .bind(local_user_id) .bind(remote_actor_url) .bind(status_str(&status)) .bind(follow_activity_id) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } async fn get_follower_follow_activity_id( &self, local_user_id: uuid::Uuid, remote_actor_url: &str, ) -> Result> { sqlx::query_scalar::<_, String>( "SELECT follow_activity_id FROM federation_followers WHERE local_user_id=$1 AND remote_actor_url=$2", ) .bind(local_user_id) .bind(remote_actor_url) .fetch_optional(&self.pool) .await .into_anyhow() } async fn remove_follower( &self, local_user_id: uuid::Uuid, remote_actor_url: &str, ) -> Result<()> { sqlx::query( "DELETE FROM federation_followers WHERE local_user_id=$1 AND remote_actor_url=$2", ) .bind(local_user_id) .bind(remote_actor_url) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result> { #[derive(sqlx::FromRow)] struct Row { #[sqlx(flatten)] actor: RemoteActorRow, status: String, } sqlx::query_as::<_, Row>( "SELECT f.remote_actor_url AS url, f.status, COALESCE(r.handle,'') AS handle, COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url, r.bio, r.banner_url, r.followers_url, r.following_url, r.also_known_as FROM federation_followers f LEFT JOIN remote_actors r ON r.url=f.remote_actor_url WHERE f.local_user_id=$1 AND f.status='accepted'", ) .bind(local_user_id) .fetch_all(&self.pool) .await .into_anyhow() .map(|rows| { rows.into_iter() .map(|r| Follower { actor: map_remote_actor(r.actor), status: str_status(&r.status), }) .collect() }) } async fn get_followers_page( &self, local_user_id: uuid::Uuid, offset: u32, limit: usize, ) -> Result> { #[derive(sqlx::FromRow)] struct Row { #[sqlx(flatten)] actor: RemoteActorRow, status: String, } sqlx::query_as::<_, Row>( "SELECT f.remote_actor_url AS url, f.status, COALESCE(r.handle,'') AS handle, COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url, r.bio, r.banner_url, r.followers_url, r.following_url, r.also_known_as FROM federation_followers f LEFT JOIN remote_actors r ON r.url=f.remote_actor_url WHERE f.local_user_id=$1 AND f.status='accepted' ORDER BY f.created_at DESC LIMIT $2 OFFSET $3", ) .bind(local_user_id) .bind(limit as i64) .bind(offset as i64) .fetch_all(&self.pool) .await .into_anyhow() .map(|rows| { rows.into_iter() .map(|r| Follower { actor: map_remote_actor(r.actor), status: str_status(&r.status), }) .collect() }) } async fn count_followers(&self, local_user_id: uuid::Uuid) -> Result { let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM federation_followers WHERE local_user_id=$1") .bind(local_user_id) .fetch_one(&self.pool) .await .into_anyhow()?; Ok(n as usize) } async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> Result { let n: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM federation_followers WHERE local_user_id=$1 AND status='accepted'", ) .bind(local_user_id) .fetch_one(&self.pool) .await .into_anyhow()?; Ok(n as usize) } async fn get_accepted_followers_page( &self, local_user_id: uuid::Uuid, offset: u32, limit: usize, ) -> Result> { sqlx::query_as::<_, RemoteActorRow>( "SELECT f.remote_actor_url AS url, COALESCE(r.handle,'') AS handle, COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url, r.bio, r.banner_url, r.followers_url, r.following_url, r.also_known_as FROM federation_followers f LEFT JOIN remote_actors r ON r.url=f.remote_actor_url WHERE f.local_user_id=$1 AND f.status='accepted' ORDER BY f.created_at DESC LIMIT $2 OFFSET $3", ) .bind(local_user_id) .bind(limit as i64) .bind(offset as i64) .fetch_all(&self.pool) .await .into_anyhow() .map(|rows| rows.into_iter().map(map_remote_actor).collect()) } async fn get_accepted_follower_inboxes( &self, local_user_id: uuid::Uuid, ) -> Result> { let rows: Vec = sqlx::query_scalar( "SELECT DISTINCT COALESCE(r.shared_inbox_url, r.inbox_url) FROM federation_followers f JOIN remote_actors r ON r.url = f.remote_actor_url WHERE f.local_user_id = $1 AND f.status = 'accepted' AND f.remote_actor_url NOT IN ( SELECT actor_url FROM federation_blocked_actors WHERE local_user_id = $1 ) AND SUBSTRING(f.remote_actor_url FROM 'https?://([^/]+)') NOT IN (SELECT domain FROM federation_blocked_domains) AND COALESCE(r.shared_inbox_url, r.inbox_url) IS NOT NULL AND COALESCE(r.shared_inbox_url, r.inbox_url) <> ''", ) .bind(local_user_id) .fetch_all(&self.pool) .await .into_anyhow()?; Ok(rows) } async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result> { sqlx::query_as::<_, RemoteActorRow>( "SELECT f.remote_actor_url AS url, COALESCE(r.handle,'') AS handle, COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url, r.bio, r.banner_url, r.followers_url, r.following_url, r.also_known_as FROM federation_followers f LEFT JOIN remote_actors r ON r.url=f.remote_actor_url WHERE f.local_user_id=$1 AND f.status='pending'", ) .bind(local_user_id) .fetch_all(&self.pool) .await .into_anyhow() .map(|rows| rows.into_iter().map(map_remote_actor).collect()) } async fn update_follower_status( &self, local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowerStatus, ) -> Result<()> { sqlx::query( "UPDATE federation_followers SET status=$3 WHERE local_user_id=$1 AND remote_actor_url=$2", ) .bind(local_user_id) .bind(remote_actor_url) .bind(status_str(&status)) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } async fn add_following( &self, local_user_id: uuid::Uuid, actor: RemoteActor, follow_activity_id: &str, ) -> Result<()> { self.upsert_remote_actor(actor.clone()).await?; sqlx::query( "INSERT INTO federation_following(local_user_id,remote_actor_url,follow_activity_id,outbox_url) VALUES($1,$2,$3,$4) ON CONFLICT(local_user_id,remote_actor_url) DO UPDATE SET follow_activity_id=EXCLUDED.follow_activity_id", ) .bind(local_user_id) .bind(&actor.url) .bind(follow_activity_id) .bind(&actor.outbox_url) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } async fn get_follow_activity_id( &self, local_user_id: uuid::Uuid, remote_actor_url: &str, ) -> Result> { sqlx::query_scalar::<_, String>( "SELECT follow_activity_id FROM federation_following WHERE local_user_id=$1 AND remote_actor_url=$2", ) .bind(local_user_id) .bind(remote_actor_url) .fetch_optional(&self.pool) .await .into_anyhow() } async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { sqlx::query( "DELETE FROM federation_following WHERE local_user_id=$1 AND remote_actor_url=$2", ) .bind(local_user_id) .bind(actor_url) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } async fn get_following(&self, local_user_id: uuid::Uuid) -> Result> { sqlx::query_as::<_, RemoteActorRow>( "SELECT f.remote_actor_url AS url, COALESCE(r.handle,'') AS handle, COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url, r.bio, r.banner_url, r.followers_url, r.following_url, r.also_known_as FROM federation_following f LEFT JOIN remote_actors r ON r.url=f.remote_actor_url WHERE f.local_user_id=$1", ) .bind(local_user_id) .fetch_all(&self.pool) .await .into_anyhow() .map(|rows| rows.into_iter().map(map_remote_actor).collect()) } async fn get_following_page( &self, local_user_id: uuid::Uuid, offset: u32, limit: usize, ) -> Result> { sqlx::query_as::<_, RemoteActorRow>( "SELECT f.remote_actor_url AS url, COALESCE(r.handle,'') AS handle, COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url, r.bio, r.banner_url, r.followers_url, r.following_url, r.also_known_as FROM federation_following f LEFT JOIN remote_actors r ON r.url=f.remote_actor_url WHERE f.local_user_id=$1 ORDER BY f.created_at DESC LIMIT $2 OFFSET $3", ) .bind(local_user_id) .bind(limit as i64) .bind(offset as i64) .fetch_all(&self.pool) .await .into_anyhow() .map(|rows| rows.into_iter().map(map_remote_actor).collect()) } async fn count_following(&self, local_user_id: uuid::Uuid) -> Result { let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM federation_following WHERE local_user_id=$1") .bind(local_user_id) .fetch_one(&self.pool) .await .into_anyhow()?; Ok(n as usize) } async fn update_following_status( &self, local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowingStatus, ) -> Result<()> { let s = match status { FollowingStatus::Pending => "pending", FollowingStatus::Accepted => "accepted", }; sqlx::query( "UPDATE federation_following SET status=$3 WHERE local_user_id=$1 AND remote_actor_url=$2", ) .bind(local_user_id) .bind(remote_actor_url) .bind(s) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } async fn get_following_outbox_url( &self, local_user_id: uuid::Uuid, remote_actor_url: &str, ) -> Result> { sqlx::query_scalar::<_, String>( "SELECT outbox_url FROM federation_following WHERE local_user_id=$1 AND remote_actor_url=$2", ) .bind(local_user_id) .bind(remote_actor_url) .fetch_optional(&self.pool) .await .into_anyhow() } async fn migrate_follower_actor( &self, old_actor_url: &str, new_actor_url: &str, ) -> Result> { let mut tx = self.pool.begin().await.into_anyhow()?; let affected: Vec = sqlx::query_scalar( "INSERT INTO federation_following(local_user_id, remote_actor_url, follow_activity_id, outbox_url) SELECT local_user_id, $2, follow_activity_id, outbox_url FROM federation_following WHERE remote_actor_url = $1 ON CONFLICT (local_user_id, remote_actor_url) DO NOTHING RETURNING local_user_id", ) .bind(old_actor_url) .bind(new_actor_url) .fetch_all(&mut *tx) .await .into_anyhow()?; sqlx::query("DELETE FROM federation_following WHERE remote_actor_url = $1") .bind(old_actor_url) .execute(&mut *tx) .await .into_anyhow()?; tx.commit().await.into_anyhow()?; Ok(affected) } } // ── ActorRepository ─────────────────────────────────────────────────────────── #[async_trait] impl ActorRepository for PgFederationRepository { async fn get_local_actor_keypair( &self, user_id: uuid::Uuid, ) -> Result> { #[derive(sqlx::FromRow)] struct Row { public_key: Option, private_key: Option, } let row = sqlx::query_as::<_, Row>( "SELECT public_key, private_key FROM users WHERE id=$1 AND local=true", ) .bind(user_id) .fetch_optional(&self.pool) .await .into_anyhow()?; Ok(row.and_then(|r| match (r.public_key, r.private_key) { (Some(pub_k), Some(priv_k)) => Some((pub_k, priv_k)), _ => None, })) } async fn save_local_actor_keypair( &self, user_id: uuid::Uuid, public_key: String, private_key: String, ) -> Result<()> { sqlx::query("UPDATE users SET public_key=$2, private_key=$3, updated_at=NOW() WHERE id=$1") .bind(user_id) .bind(&public_key) .bind(&private_key) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> { let also_known_as: Option> = if actor.also_known_as.is_empty() { None } else { Some(actor.also_known_as) }; sqlx::query( "INSERT INTO remote_actors(url,handle,display_name,inbox_url,shared_inbox_url,public_key, avatar_url,outbox_url,bio,banner_url,followers_url,following_url,also_known_as,last_fetched_at) VALUES($1,$2,$3,$4,$5,'',$6,$7,$8,$9,$10,$11,$12,NOW()) ON CONFLICT(url) DO UPDATE SET handle=EXCLUDED.handle, display_name=EXCLUDED.display_name, inbox_url=EXCLUDED.inbox_url, shared_inbox_url=EXCLUDED.shared_inbox_url, avatar_url=EXCLUDED.avatar_url, outbox_url=EXCLUDED.outbox_url, bio=EXCLUDED.bio, banner_url=EXCLUDED.banner_url, followers_url=EXCLUDED.followers_url, following_url=EXCLUDED.following_url, also_known_as=EXCLUDED.also_known_as, last_fetched_at=NOW()", ) .bind(&actor.url) .bind(&actor.handle) .bind(&actor.display_name) .bind(&actor.inbox_url) .bind(&actor.shared_inbox_url) .bind(&actor.avatar_url) .bind(&actor.outbox_url) .bind(&actor.bio) .bind(&actor.banner_url) .bind(&actor.followers_url) .bind(&actor.following_url) .bind(also_known_as.as_deref()) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } async fn get_remote_actor(&self, actor_url: &str) -> Result> { sqlx::query_as::<_, RemoteActorRow>( "SELECT url, handle, inbox_url, shared_inbox_url, display_name, avatar_url, outbox_url, bio, banner_url, followers_url, following_url, also_known_as FROM remote_actors WHERE url=$1", ) .bind(actor_url) .fetch_optional(&self.pool) .await .into_anyhow() .map(|o| o.map(map_remote_actor)) } async fn add_announce( &self, activity_id: &str, object_url: &str, actor_url: &str, announced_at: DateTime, ) -> Result<()> { sqlx::query( "INSERT INTO federation_announces(activity_id,object_url,actor_url,announced_at) VALUES($1,$2,$3,$4) ON CONFLICT(activity_id) DO NOTHING", ) .bind(activity_id) .bind(object_url) .bind(actor_url) .bind(announced_at) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } async fn remove_announce(&self, activity_id: &str, actor_url: &str) -> Result<()> { sqlx::query("DELETE FROM federation_announces WHERE activity_id=$1 AND actor_url=$2") .bind(activity_id) .bind(actor_url) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } async fn count_announces(&self, object_url: &str) -> Result { let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM federation_announces WHERE object_url=$1") .bind(object_url) .fetch_one(&self.pool) .await .into_anyhow()?; Ok(n as usize) } } // ── BlocklistRepository ─────────────────────────────────────────────────────── #[async_trait] impl BlocklistRepository for PgFederationRepository { async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> Result<()> { sqlx::query( "INSERT INTO federation_blocked_domains(domain,reason) VALUES($1,$2) ON CONFLICT(domain) DO NOTHING", ) .bind(domain) .bind(reason) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } async fn remove_blocked_domain(&self, domain: &str) -> Result<()> { sqlx::query("DELETE FROM federation_blocked_domains WHERE domain=$1") .bind(domain) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } async fn get_blocked_domains(&self) -> Result> { #[derive(sqlx::FromRow)] struct Row { domain: String, reason: Option, blocked_at: DateTime, } sqlx::query_as::<_, Row>( "SELECT domain,reason,blocked_at FROM federation_blocked_domains ORDER BY domain", ) .fetch_all(&self.pool) .await .into_anyhow() .map(|rows| { rows.into_iter() .map(|r| BlockedDomain { domain: r.domain, reason: r.reason, blocked_at: r.blocked_at.to_rfc3339(), }) .collect() }) } async fn is_domain_blocked(&self, domain: &str) -> Result { let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM federation_blocked_domains WHERE domain=$1") .bind(domain) .fetch_one(&self.pool) .await .into_anyhow()?; Ok(n > 0) } async fn add_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { sqlx::query( "INSERT INTO federation_blocked_actors(local_user_id,actor_url) VALUES($1,$2) ON CONFLICT DO NOTHING", ) .bind(local_user_id) .bind(actor_url) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { sqlx::query("DELETE FROM federation_blocked_actors WHERE local_user_id=$1 AND actor_url=$2") .bind(local_user_id) .bind(actor_url) .execute(&self.pool) .await .into_anyhow() .map(|_| ()) } async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result> { sqlx::query_scalar::<_, String>( "SELECT actor_url FROM federation_blocked_actors WHERE local_user_id=$1 ORDER BY created_at DESC", ) .bind(local_user_id) .fetch_all(&self.pool) .await .into_anyhow() } async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result { let n: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM federation_blocked_actors WHERE local_user_id=$1 AND actor_url=$2", ) .bind(local_user_id) .bind(actor_url) .fetch_one(&self.pool) .await .into_anyhow()?; Ok(n > 0) } } // ── PostgresApUserRepository ────────────────────────────────────────────────── #[derive(sqlx::FromRow)] struct UserRow { id: uuid::Uuid, username: String, display_name: Option, bio: Option, avatar_url: Option, header_url: Option, also_known_as: Option, profile_fields: Option, } pub struct PgApUserRepository { pool: PgPool, base_url: String, } impl PgApUserRepository { pub fn new(pool: PgPool, base_url: String) -> Self { Self { pool, base_url } } fn row_to_ap_user(&self, r: UserRow) -> ApUser { let profile_url = url::Url::parse(&format!("{}/users/{}", self.base_url, r.id)).ok(); let avatar_url = r.avatar_url.and_then(|u| url::Url::parse(&u).ok()); let banner_url = r.header_url.and_then(|u| url::Url::parse(&u).ok()); let attachment = r .profile_fields .and_then(|v| v.as_array().cloned()) .map(|arr| { arr.into_iter() .filter_map(|item| { let name = item.get("name")?.as_str()?.to_string(); let value = item.get("value")?.as_str()?.to_string(); Some(ApProfileField { name, value }) }) .collect() }) .unwrap_or_default(); ApUser { id: r.id, username: r.username, display_name: r.display_name, bio: r.bio, avatar_url, banner_url, also_known_as: r.also_known_as.into_iter().collect(), profile_url, attachment, manually_approves_followers: true, discoverable: true, actor_type: ApActorType::default(), featured_url: None, } } } #[async_trait] impl ApUserRepository for PgApUserRepository { async fn find_by_id(&self, id: uuid::Uuid) -> Result> { let row = sqlx::query_as::<_, UserRow>( "SELECT id,username,display_name,bio,avatar_url,header_url,also_known_as,profile_fields FROM users WHERE id=$1 AND local=true", ) .bind(id) .fetch_optional(&self.pool) .await .into_anyhow()?; Ok(row.map(|r| self.row_to_ap_user(r))) } async fn find_by_username(&self, username: &str) -> Result> { let row = sqlx::query_as::<_, UserRow>( "SELECT id,username,display_name,bio,avatar_url,header_url,also_known_as,profile_fields FROM users WHERE username=$1 AND local=true", ) .bind(username) .fetch_optional(&self.pool) .await .into_anyhow()?; Ok(row.map(|r| self.row_to_ap_user(r))) } async fn count_users(&self) -> Result { let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users WHERE local=true") .fetch_one(&self.pool) .await .into_anyhow()?; Ok(n as usize) } }