From d62dde67bb7d3eabb8efd41c8a31579820971b00 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 15 May 2026 00:29:33 +0200 Subject: [PATCH] feat(postgres): remote_actor_connections table + PgRemoteActorConnectionRepository --- .../006_remote_actor_connections.sql | 13 +++ crates/adapters/postgres/src/lib.rs | 1 + .../postgres/src/remote_actor_connections.rs | 110 ++++++++++++++++++ 3 files changed, 124 insertions(+) create mode 100644 crates/adapters/postgres/migrations/006_remote_actor_connections.sql create mode 100644 crates/adapters/postgres/src/remote_actor_connections.rs diff --git a/crates/adapters/postgres/migrations/006_remote_actor_connections.sql b/crates/adapters/postgres/migrations/006_remote_actor_connections.sql new file mode 100644 index 0000000..36edda7 --- /dev/null +++ b/crates/adapters/postgres/migrations/006_remote_actor_connections.sql @@ -0,0 +1,13 @@ +CREATE TABLE remote_actor_connections ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + actor_url TEXT NOT NULL, + connection_type TEXT NOT NULL, + page INT NOT NULL, + connected_actor_url TEXT NOT NULL, + connected_handle TEXT NOT NULL, + connected_display_name TEXT, + connected_avatar_url TEXT, + fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(actor_url, connection_type, page, connected_actor_url) +); +CREATE INDEX ON remote_actor_connections(actor_url, connection_type, page, fetched_at); diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index 0c479d9..dfe8a56 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -7,6 +7,7 @@ pub mod follow; pub mod like; pub mod notification; pub mod remote_actor; +pub mod remote_actor_connections; pub mod tag; pub mod thought; pub mod top_friend; diff --git a/crates/adapters/postgres/src/remote_actor_connections.rs b/crates/adapters/postgres/src/remote_actor_connections.rs new file mode 100644 index 0000000..6259795 --- /dev/null +++ b/crates/adapters/postgres/src/remote_actor_connections.rs @@ -0,0 +1,110 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, models::actor_connection_summary::ActorConnectionSummary, + ports::RemoteActorConnectionRepository, +}; +use sqlx::PgPool; + +pub struct PgRemoteActorConnectionRepository { + pool: PgPool, +} + +impl PgRemoteActorConnectionRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl RemoteActorConnectionRepository for PgRemoteActorConnectionRepository { + async fn upsert_connections( + &self, + actor_url: &str, + connection_type: &str, + page: u32, + actors: &[ActorConnectionSummary], + ) -> Result<(), DomainError> { + for actor in actors { + sqlx::query( + "INSERT INTO remote_actor_connections + (actor_url, connection_type, page, connected_actor_url, + connected_handle, connected_display_name, connected_avatar_url, fetched_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) + ON CONFLICT(actor_url, connection_type, page, connected_actor_url) + DO UPDATE SET + connected_handle = EXCLUDED.connected_handle, + connected_display_name = EXCLUDED.connected_display_name, + connected_avatar_url = EXCLUDED.connected_avatar_url, + fetched_at = NOW()", + ) + .bind(actor_url) + .bind(connection_type) + .bind(page as i32) + .bind(&actor.url) + .bind(&actor.handle) + .bind(&actor.display_name) + .bind(&actor.avatar_url) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + } + Ok(()) + } + + async fn list_connections( + &self, + actor_url: &str, + connection_type: &str, + page: u32, + ) -> Result, DomainError> { + #[derive(sqlx::FromRow)] + struct Row { + connected_actor_url: String, + connected_handle: String, + connected_display_name: Option, + connected_avatar_url: Option, + } + let rows = sqlx::query_as::<_, Row>( + "SELECT connected_actor_url, connected_handle, connected_display_name, connected_avatar_url + FROM remote_actor_connections + WHERE actor_url = $1 AND connection_type = $2 AND page = $3 + ORDER BY connected_handle", + ) + .bind(actor_url) + .bind(connection_type) + .bind(page as i32) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows + .into_iter() + .map(|r| ActorConnectionSummary { + url: r.connected_actor_url, + handle: r.connected_handle, + display_name: r.connected_display_name, + avatar_url: r.connected_avatar_url, + }) + .collect()) + } + + async fn connection_page_age( + &self, + actor_url: &str, + connection_type: &str, + page: u32, + ) -> Result>, DomainError> { + let row: Option<(Option>,)> = sqlx::query_as( + "SELECT MAX(fetched_at) FROM remote_actor_connections + WHERE actor_url = $1 AND connection_type = $2 AND page = $3", + ) + .bind(actor_url) + .bind(connection_type) + .bind(page as i32) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.and_then(|(ts,)| ts)) + } +}