From 21b6a04f97ad6c13388ee42921b36bdc3bb8d468 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 10:19:57 +0200 Subject: [PATCH] feat(postgres-federation): FederationRepository and ApUserRepository --- .../adapters/postgres-federation/Cargo.toml | 14 + .../adapters/postgres-federation/src/lib.rs | 362 ++++++++++++++++++ .../migrations/005_federation_tables.sql | 54 +++ 3 files changed, 430 insertions(+) create mode 100644 crates/adapters/postgres/migrations/005_federation_tables.sql diff --git a/crates/adapters/postgres-federation/Cargo.toml b/crates/adapters/postgres-federation/Cargo.toml index 0c23227..55ab7a2 100644 --- a/crates/adapters/postgres-federation/Cargo.toml +++ b/crates/adapters/postgres-federation/Cargo.toml @@ -2,3 +2,17 @@ name = "postgres-federation" version = "0.1.0" edition = "2021" + +[dependencies] +activitypub-base = { workspace = true } +sqlx = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +tracing = { workspace = true } +async-trait = { workspace = true } +anyhow = { workspace = true } +url = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["full"] } +sqlx = { workspace = true, features = ["migrate"] } diff --git a/crates/adapters/postgres-federation/src/lib.rs b/crates/adapters/postgres-federation/src/lib.rs index e69de29..3cc21bd 100644 --- a/crates/adapters/postgres-federation/src/lib.rs +++ b/crates/adapters/postgres-federation/src/lib.rs @@ -0,0 +1,362 @@ +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use sqlx::PgPool; + +use activitypub_base::{ + ApUser, ApUserRepository, + BlockedDomain, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, +}; + +// ── PostgresFederationRepository ───────────────────────────────────────────── + +pub struct PostgresFederationRepository { + pool: PgPool, +} + +impl PostgresFederationRepository { + pub fn new(pool: PgPool) -> Self { Self { pool } } +} + +fn status_str(s: &FollowerStatus) -> &'static str { + match s { FollowerStatus::Pending => "pending", FollowerStatus::Accepted => "accepted", FollowerStatus::Rejected => "rejected" } +} +fn str_status(s: &str) -> FollowerStatus { + match s { "accepted" => FollowerStatus::Accepted, "rejected" => FollowerStatus::Rejected, _ => FollowerStatus::Pending } +} + +fn map_remote_actor( + url: String, handle: String, inbox_url: String, + shared_inbox_url: Option, display_name: Option, + avatar_url: Option, outbox_url: Option, +) -> RemoteActor { + RemoteActor { url, handle, inbox_url, shared_inbox_url, display_name, avatar_url, outbox_url } +} + +#[async_trait] +impl FederationRepository for PostgresFederationRepository { + async fn add_follower( + &self, local_user_id: uuid::Uuid, remote_actor_url: &str, + status: FollowerStatus, follow_activity_id: &str, + ) -> Result<()> { + sqlx::query( + "INSERT INTO federation_followers(local_user_id,remote_actor_url,status,follow_activity_id) + VALUES($1,$2,$3,$4) + ON CONFLICT(local_user_id,remote_actor_url) DO UPDATE + SET status=EXCLUDED.status, follow_activity_id=EXCLUDED.follow_activity_id" + ) + .bind(local_user_id).bind(remote_actor_url).bind(status_str(&status)).bind(follow_activity_id) + .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn get_follower_follow_activity_id( + &self, local_user_id: uuid::Uuid, remote_actor_url: &str, + ) -> Result> { + sqlx::query_scalar::<_, String>( + "SELECT follow_activity_id FROM federation_followers WHERE local_user_id=$1 AND remote_actor_url=$2" + ).bind(local_user_id).bind(remote_actor_url).fetch_optional(&self.pool).await.map_err(|e| anyhow!(e)) + } + + async fn remove_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result<()> { + sqlx::query("DELETE FROM federation_followers WHERE local_user_id=$1 AND remote_actor_url=$2") + .bind(local_user_id).bind(remote_actor_url) + .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result> { + #[derive(sqlx::FromRow)] + struct Row { remote_actor_url: String, status: String, handle: String, inbox_url: String, shared_inbox_url: Option, display_name: Option, avatar_url: Option, outbox_url: Option } + sqlx::query_as::<_, Row>( + "SELECT f.remote_actor_url, f.status, COALESCE(r.handle,'') AS handle, + COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url + FROM federation_followers f + LEFT JOIN remote_actors r ON r.url=f.remote_actor_url + WHERE f.local_user_id=$1" + ).bind(local_user_id).fetch_all(&self.pool).await.map_err(|e| anyhow!(e)).map(|rows| rows.into_iter().map(|r| Follower { + actor: map_remote_actor(r.remote_actor_url, r.handle, r.inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url), + status: str_status(&r.status), + }).collect()) + } + + async fn get_followers_page( + &self, local_user_id: uuid::Uuid, offset: u32, limit: usize, + ) -> Result> { + #[derive(sqlx::FromRow)] + struct Row { remote_actor_url: String, status: String, handle: String, inbox_url: String, shared_inbox_url: Option, display_name: Option, avatar_url: Option, outbox_url: Option } + sqlx::query_as::<_, Row>( + "SELECT f.remote_actor_url, f.status, COALESCE(r.handle,'') AS handle, + COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url + FROM federation_followers f + LEFT JOIN remote_actors r ON r.url=f.remote_actor_url + WHERE f.local_user_id=$1 AND f.status='accepted' + ORDER BY f.created_at DESC LIMIT $2 OFFSET $3" + ).bind(local_user_id).bind(limit as i64).bind(offset as i64).fetch_all(&self.pool).await.map_err(|e| anyhow!(e)).map(|rows| rows.into_iter().map(|r| Follower { + actor: map_remote_actor(r.remote_actor_url, r.handle, r.inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url), + status: str_status(&r.status), + }).collect()) + } + + async fn count_followers(&self, local_user_id: uuid::Uuid) -> Result { + let n: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM federation_followers WHERE local_user_id=$1 AND status='accepted'" + ).bind(local_user_id).fetch_one(&self.pool).await.map_err(|e| anyhow!(e))?; + Ok(n as usize) + } + + async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result> { + #[derive(sqlx::FromRow)] + struct Row { remote_actor_url: String, handle: String, inbox_url: String, shared_inbox_url: Option, display_name: Option, avatar_url: Option, outbox_url: Option } + sqlx::query_as::<_, Row>( + "SELECT f.remote_actor_url, COALESCE(r.handle,'') AS handle, + COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url + FROM federation_followers f + LEFT JOIN remote_actors r ON r.url=f.remote_actor_url + WHERE f.local_user_id=$1 AND f.status='pending'" + ).bind(local_user_id).fetch_all(&self.pool).await.map_err(|e| anyhow!(e)).map(|rows| rows.into_iter().map(|r| + map_remote_actor(r.remote_actor_url, r.handle, r.inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url) + ).collect()) + } + + async fn update_follower_status( + &self, local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowerStatus, + ) -> Result<()> { + sqlx::query("UPDATE federation_followers SET status=$3 WHERE local_user_id=$1 AND remote_actor_url=$2") + .bind(local_user_id).bind(remote_actor_url).bind(status_str(&status)) + .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn add_following( + &self, local_user_id: uuid::Uuid, actor: RemoteActor, follow_activity_id: &str, + ) -> Result<()> { + self.upsert_remote_actor(actor.clone()).await?; + sqlx::query( + "INSERT INTO federation_following(local_user_id,remote_actor_url,follow_activity_id,outbox_url) + VALUES($1,$2,$3,$4) + ON CONFLICT(local_user_id,remote_actor_url) DO UPDATE + SET follow_activity_id=EXCLUDED.follow_activity_id" + ) + .bind(local_user_id).bind(&actor.url).bind(follow_activity_id).bind(&actor.outbox_url) + .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn get_follow_activity_id( + &self, local_user_id: uuid::Uuid, remote_actor_url: &str, + ) -> Result> { + sqlx::query_scalar::<_, String>( + "SELECT follow_activity_id FROM federation_following WHERE local_user_id=$1 AND remote_actor_url=$2" + ).bind(local_user_id).bind(remote_actor_url).fetch_optional(&self.pool).await.map_err(|e| anyhow!(e)) + } + + async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { + sqlx::query("DELETE FROM federation_following WHERE local_user_id=$1 AND remote_actor_url=$2") + .bind(local_user_id).bind(actor_url) + .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn get_following(&self, local_user_id: uuid::Uuid) -> Result> { + #[derive(sqlx::FromRow)] + struct Row { remote_actor_url: String, handle: String, inbox_url: String, shared_inbox_url: Option, display_name: Option, avatar_url: Option, outbox_url: Option } + sqlx::query_as::<_, Row>( + "SELECT f.remote_actor_url, COALESCE(r.handle,'') AS handle, + COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url + FROM federation_following f + LEFT JOIN remote_actors r ON r.url=f.remote_actor_url + WHERE f.local_user_id=$1" + ).bind(local_user_id).fetch_all(&self.pool).await.map_err(|e| anyhow!(e)).map(|rows| rows.into_iter().map(|r| + map_remote_actor(r.remote_actor_url, r.handle, r.inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url) + ).collect()) + } + + async fn get_following_page( + &self, local_user_id: uuid::Uuid, offset: u32, limit: usize, + ) -> Result> { + #[derive(sqlx::FromRow)] + struct Row { remote_actor_url: String, handle: String, inbox_url: String, shared_inbox_url: Option, display_name: Option, avatar_url: Option, outbox_url: Option } + sqlx::query_as::<_, Row>( + "SELECT f.remote_actor_url, COALESCE(r.handle,'') AS handle, + COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url + FROM federation_following f + LEFT JOIN remote_actors r ON r.url=f.remote_actor_url + WHERE f.local_user_id=$1 + ORDER BY f.created_at DESC LIMIT $2 OFFSET $3" + ).bind(local_user_id).bind(limit as i64).bind(offset as i64).fetch_all(&self.pool).await.map_err(|e| anyhow!(e)).map(|rows| rows.into_iter().map(|r| + map_remote_actor(r.remote_actor_url, r.handle, r.inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url) + ).collect()) + } + + async fn count_following(&self, local_user_id: uuid::Uuid) -> Result { + let n: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM federation_following WHERE local_user_id=$1" + ).bind(local_user_id).fetch_one(&self.pool).await.map_err(|e| anyhow!(e))?; + Ok(n as usize) + } + + async fn update_following_status( + &self, _local_user_id: uuid::Uuid, _remote_actor_url: &str, _status: FollowingStatus, + ) -> Result<()> { + Ok(()) + } + + async fn get_following_outbox_url( + &self, local_user_id: uuid::Uuid, remote_actor_url: &str, + ) -> Result> { + sqlx::query_scalar::<_, String>( + "SELECT outbox_url FROM federation_following WHERE local_user_id=$1 AND remote_actor_url=$2" + ).bind(local_user_id).bind(remote_actor_url).fetch_optional(&self.pool).await.map_err(|e| anyhow!(e)) + } + + async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> { + sqlx::query( + "INSERT INTO remote_actors(url,handle,display_name,inbox_url,shared_inbox_url,public_key,avatar_url,outbox_url,last_fetched_at) + VALUES($1,$2,$3,$4,$5,'',$6,$7,NOW()) + ON CONFLICT(url) DO UPDATE SET handle=EXCLUDED.handle,display_name=EXCLUDED.display_name, + inbox_url=EXCLUDED.inbox_url,shared_inbox_url=EXCLUDED.shared_inbox_url, + avatar_url=EXCLUDED.avatar_url,outbox_url=EXCLUDED.outbox_url,last_fetched_at=NOW()" + ) + .bind(&actor.url).bind(&actor.handle).bind(&actor.display_name) + .bind(&actor.inbox_url).bind(&actor.shared_inbox_url) + .bind(&actor.avatar_url).bind(&actor.outbox_url) + .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn get_remote_actor(&self, actor_url: &str) -> Result> { + #[derive(sqlx::FromRow)] + struct Row { url: String, handle: String, inbox_url: String, shared_inbox_url: Option, display_name: Option, avatar_url: Option, outbox_url: Option } + sqlx::query_as::<_, Row>( + "SELECT url,handle,inbox_url,shared_inbox_url,display_name,avatar_url,outbox_url FROM remote_actors WHERE url=$1" + ).bind(actor_url).fetch_optional(&self.pool).await.map_err(|e| anyhow!(e)).map(|o| o.map(|r| + map_remote_actor(r.url, r.handle, r.inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url) + )) + } + + async fn get_local_actor_keypair(&self, user_id: uuid::Uuid) -> Result> { + #[derive(sqlx::FromRow)] + struct Row { public_key: Option, private_key: Option } + let row = sqlx::query_as::<_, Row>( + "SELECT public_key, private_key FROM users WHERE id=$1 AND local=true" + ).bind(user_id).fetch_optional(&self.pool).await.map_err(|e| anyhow!(e))?; + Ok(row.and_then(|r| match (r.public_key, r.private_key) { + (Some(pub_k), Some(priv_k)) => Some((pub_k, priv_k)), + _ => None, + })) + } + + async fn save_local_actor_keypair( + &self, user_id: uuid::Uuid, public_key: String, private_key: String, + ) -> Result<()> { + sqlx::query("UPDATE users SET public_key=$2, private_key=$3, updated_at=NOW() WHERE id=$1") + .bind(user_id).bind(&public_key).bind(&private_key) + .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn add_announce( + &self, activity_id: &str, object_url: &str, actor_url: &str, announced_at: DateTime, + ) -> Result<()> { + sqlx::query( + "INSERT INTO federation_announces(activity_id,object_url,actor_url,announced_at) + VALUES($1,$2,$3,$4) ON CONFLICT(activity_id) DO NOTHING" + ).bind(activity_id).bind(object_url).bind(actor_url).bind(announced_at) + .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn count_announces(&self, object_url: &str) -> Result { + let n: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM federation_announces WHERE object_url=$1" + ).bind(object_url).fetch_one(&self.pool).await.map_err(|e| anyhow!(e))?; + Ok(n as usize) + } + + async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> Result<()> { + sqlx::query( + "INSERT INTO federation_blocked_domains(domain,reason) VALUES($1,$2) ON CONFLICT(domain) DO NOTHING" + ).bind(domain).bind(reason).execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn remove_blocked_domain(&self, domain: &str) -> Result<()> { + sqlx::query("DELETE FROM federation_blocked_domains WHERE domain=$1") + .bind(domain).execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn get_blocked_domains(&self) -> Result> { + #[derive(sqlx::FromRow)] + struct Row { domain: String, reason: Option, blocked_at: DateTime } + sqlx::query_as::<_, Row>("SELECT domain,reason,blocked_at FROM federation_blocked_domains ORDER BY domain") + .fetch_all(&self.pool).await.map_err(|e| anyhow!(e)).map(|rows| rows.into_iter().map(|r| + BlockedDomain { domain: r.domain, reason: r.reason, blocked_at: r.blocked_at.to_rfc3339() } + ).collect()) + } + + async fn is_domain_blocked(&self, domain: &str) -> Result { + let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM federation_blocked_domains WHERE domain=$1") + .bind(domain).fetch_one(&self.pool).await.map_err(|e| anyhow!(e))?; + Ok(n > 0) + } + + async fn add_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { + sqlx::query( + "INSERT INTO federation_blocked_actors(local_user_id,actor_url) VALUES($1,$2) ON CONFLICT DO NOTHING" + ).bind(local_user_id).bind(actor_url).execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { + sqlx::query("DELETE FROM federation_blocked_actors WHERE local_user_id=$1 AND actor_url=$2") + .bind(local_user_id).bind(actor_url).execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + } + + async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result> { + sqlx::query_scalar::<_, String>( + "SELECT actor_url FROM federation_blocked_actors WHERE local_user_id=$1 ORDER BY created_at DESC" + ).bind(local_user_id).fetch_all(&self.pool).await.map_err(|e| anyhow!(e)) + } + + async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result { + let n: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM federation_blocked_actors WHERE local_user_id=$1 AND actor_url=$2" + ).bind(local_user_id).bind(actor_url).fetch_one(&self.pool).await.map_err(|e| anyhow!(e))?; + Ok(n > 0) + } +} + +// ── PostgresApUserRepository ────────────────────────────────────────────────── + +pub struct PostgresApUserRepository { + pool: PgPool, + base_url: String, +} + +impl PostgresApUserRepository { + pub fn new(pool: PgPool, base_url: String) -> Self { Self { pool, base_url } } + + fn row_to_ap_user(&self, id: uuid::Uuid, username: String, bio: Option, avatar_url: Option) -> ApUser { + let profile_url = url::Url::parse(&format!("{}/users/{}", self.base_url, username)).ok(); + let avatar_url = avatar_url.and_then(|u| url::Url::parse(&u).ok()); + ApUser { id, username, bio, avatar_url, banner_url: None, also_known_as: None, profile_url, attachment: vec![] } + } +} + +#[async_trait] +impl ApUserRepository for PostgresApUserRepository { + async fn find_by_id(&self, id: uuid::Uuid) -> Result> { + #[derive(sqlx::FromRow)] + struct Row { id: uuid::Uuid, username: String, bio: Option, avatar_url: Option } + let row = sqlx::query_as::<_, Row>( + "SELECT id,username,bio,avatar_url FROM users WHERE id=$1 AND local=true" + ).bind(id).fetch_optional(&self.pool).await.map_err(|e| anyhow!(e))?; + Ok(row.map(|r| self.row_to_ap_user(r.id, r.username, r.bio, r.avatar_url))) + } + + async fn find_by_username(&self, username: &str) -> Result> { + #[derive(sqlx::FromRow)] + struct Row { id: uuid::Uuid, username: String, bio: Option, avatar_url: Option } + let row = sqlx::query_as::<_, Row>( + "SELECT id,username,bio,avatar_url FROM users WHERE username=$1 AND local=true" + ).bind(username).fetch_optional(&self.pool).await.map_err(|e| anyhow!(e))?; + Ok(row.map(|r| self.row_to_ap_user(r.id, r.username, r.bio, r.avatar_url))) + } + + async fn count_users(&self) -> Result { + let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users WHERE local=true") + .fetch_one(&self.pool).await.map_err(|e| anyhow!(e))?; + Ok(n as usize) + } +} diff --git a/crates/adapters/postgres/migrations/005_federation_tables.sql b/crates/adapters/postgres/migrations/005_federation_tables.sql new file mode 100644 index 0000000..3d4a703 --- /dev/null +++ b/crates/adapters/postgres/migrations/005_federation_tables.sql @@ -0,0 +1,54 @@ +-- Add avatar_url and outbox_url to remote_actors (FederationRepository::RemoteActor needs them) +ALTER TABLE remote_actors + ADD COLUMN IF NOT EXISTS avatar_url TEXT, + ADD COLUMN IF NOT EXISTS outbox_url TEXT; + +-- Federation followers: remote actors following local users +CREATE TABLE IF NOT EXISTS federation_followers ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + local_user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + remote_actor_url TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + follow_activity_id TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (local_user_id, remote_actor_url) +); + +-- Federation following: local users following remote actors +CREATE TABLE IF NOT EXISTS federation_following ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + local_user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + remote_actor_url TEXT NOT NULL, + follow_activity_id TEXT NOT NULL, + outbox_url TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (local_user_id, remote_actor_url) +); + +-- Announces (boosts of remote objects via AP) +CREATE TABLE IF NOT EXISTS federation_announces ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + activity_id TEXT NOT NULL UNIQUE, + object_url TEXT NOT NULL, + actor_url TEXT NOT NULL, + announced_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Blocked domains (instance-level) +CREATE TABLE IF NOT EXISTS federation_blocked_domains ( + domain TEXT PRIMARY KEY, + reason TEXT, + blocked_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Blocked actors (per local user) +CREATE TABLE IF NOT EXISTS federation_blocked_actors ( + local_user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + actor_url TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (local_user_id, actor_url) +); + +CREATE INDEX IF NOT EXISTS idx_fed_followers_user ON federation_followers(local_user_id); +CREATE INDEX IF NOT EXISTS idx_fed_following_user ON federation_following(local_user_id); +CREATE INDEX IF NOT EXISTS idx_fed_announces_object ON federation_announces(object_url);