Compare commits

...

8 Commits

28 changed files with 2140 additions and 44 deletions

View File

@@ -5,6 +5,7 @@ edition = "2024"
[dependencies]
tokio = { workspace = true }
futures = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
uuid = { workspace = true }

View File

@@ -1599,6 +1599,85 @@ impl domain::ports::FederationActionPort for ActivityPubService {
Ok(notes)
}
async fn fetch_actor_urls_from_collection(
&self,
collection_url: &str,
) -> Result<Vec<String>, domain::errors::DomainError> {
let resp: serde_json::Value = reqwest::Client::new()
.get(collection_url)
.header("Accept", "application/activity+json, application/ld+json")
.send()
.await
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?
.json()
.await
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?;
let empty = vec![];
let items = resp["orderedItems"].as_array().unwrap_or(&empty);
Ok(items
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect())
}
async fn resolve_actor_profiles(
&self,
urls: Vec<String>,
) -> Vec<domain::models::actor_connection_summary::ActorConnectionSummary> {
use futures::future;
async fn fetch_one(
url: String,
) -> Option<domain::models::actor_connection_summary::ActorConnectionSummary> {
let resp: serde_json::Value = tokio::time::timeout(
std::time::Duration::from_secs(5),
reqwest::Client::new()
.get(&url)
.header("Accept", "application/activity+json")
.send(),
)
.await
.ok()?
.ok()?
.json()
.await
.ok()?;
let ap_url = resp["id"].as_str()?.to_string();
let preferred_username = resp["preferredUsername"].as_str().unwrap_or("").to_string();
let domain_str = url::Url::parse(&ap_url)
.ok()
.and_then(|u| u.host_str().map(|s| s.to_string()))
.unwrap_or_default();
let handle = format!("{}@{}", preferred_username, domain_str);
let display_name = resp["name"].as_str().map(|s| s.to_string());
let avatar_url = resp["icon"]["url"].as_str().map(|s| s.to_string());
Some(
domain::models::actor_connection_summary::ActorConnectionSummary {
url: ap_url,
handle,
display_name,
avatar_url,
},
)
}
let futs: Vec<_> = urls.into_iter().map(fetch_one).collect();
let results = future::join_all(futs).await;
results
.into_iter()
.filter_map(|r| {
if r.is_none() {
tracing::warn!("failed to resolve actor profile (timeout or parse error)");
}
r
})
.collect()
}
}
#[cfg(test)]

View File

@@ -4,6 +4,12 @@ where
{
}
fn _assert_impl_federation_action_port_connections()
where
crate::service::ActivityPubService: domain::ports::FederationActionPort,
{
}
use super::*;
use crate::repository::{Follower, FollowerStatus, RemoteActor};

View File

@@ -72,6 +72,12 @@ pub enum EventPayload {
actor_ap_url: String,
outbox_url: String,
},
FetchActorConnections {
actor_ap_url: String,
collection_url: String,
connection_type: String,
page: u32,
},
}
impl EventPayload {
@@ -93,6 +99,7 @@ impl EventPayload {
Self::UserUnblocked { .. } => "users.unblocked",
Self::UserRegistered { .. } => "users.registered",
Self::FetchRemoteActorPosts { .. } => "federation.fetch_outbox",
Self::FetchActorConnections { .. } => "federation.fetch_connections",
}
}
}
@@ -209,6 +216,17 @@ impl From<&DomainEvent> for EventPayload {
actor_ap_url: actor_ap_url.clone(),
outbox_url: outbox_url.clone(),
},
DomainEvent::FetchActorConnections {
actor_ap_url,
collection_url,
connection_type,
page,
} => Self::FetchActorConnections {
actor_ap_url: actor_ap_url.clone(),
collection_url: collection_url.clone(),
connection_type: connection_type.clone(),
page: *page,
},
}
}
}
@@ -334,6 +352,17 @@ impl TryFrom<EventPayload> for DomainEvent {
actor_ap_url,
outbox_url,
},
EventPayload::FetchActorConnections {
actor_ap_url,
collection_url,
connection_type,
page,
} => DomainEvent::FetchActorConnections {
actor_ap_url,
collection_url,
connection_type,
page,
},
})
}
}
@@ -419,6 +448,12 @@ mod tests {
actor_ap_url: "https://mastodon.social/users/alice".into(),
outbox_url: "https://mastodon.social/users/alice/outbox".into(),
},
EventPayload::FetchActorConnections {
actor_ap_url: "https://mastodon.social/users/alice".into(),
collection_url: "https://mastodon.social/users/alice/followers".into(),
connection_type: "followers".into(),
page: 1,
},
];
let mut subjects: Vec<&str> = samples.iter().map(|p| p.subject()).collect();
subjects.sort();

View File

@@ -0,0 +1,13 @@
CREATE TABLE remote_actor_connections (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
actor_url TEXT NOT NULL,
connection_type TEXT NOT NULL,
page INT NOT NULL,
connected_actor_url TEXT NOT NULL,
connected_handle TEXT NOT NULL,
connected_display_name TEXT,
connected_avatar_url TEXT,
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(actor_url, connection_type, page, connected_actor_url)
);
CREATE INDEX ON remote_actor_connections(actor_url, connection_type, page, fetched_at);

View File

@@ -7,6 +7,7 @@ pub mod follow;
pub mod like;
pub mod notification;
pub mod remote_actor;
pub mod remote_actor_connections;
pub mod tag;
pub mod thought;
pub mod top_friend;

View File

@@ -0,0 +1,110 @@
use async_trait::async_trait;
use domain::{
errors::DomainError, models::actor_connection_summary::ActorConnectionSummary,
ports::RemoteActorConnectionRepository,
};
use sqlx::PgPool;
pub struct PgRemoteActorConnectionRepository {
pool: PgPool,
}
impl PgRemoteActorConnectionRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait]
impl RemoteActorConnectionRepository for PgRemoteActorConnectionRepository {
async fn upsert_connections(
&self,
actor_url: &str,
connection_type: &str,
page: u32,
actors: &[ActorConnectionSummary],
) -> Result<(), DomainError> {
for actor in actors {
sqlx::query(
"INSERT INTO remote_actor_connections
(actor_url, connection_type, page, connected_actor_url,
connected_handle, connected_display_name, connected_avatar_url, fetched_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
ON CONFLICT(actor_url, connection_type, page, connected_actor_url)
DO UPDATE SET
connected_handle = EXCLUDED.connected_handle,
connected_display_name = EXCLUDED.connected_display_name,
connected_avatar_url = EXCLUDED.connected_avatar_url,
fetched_at = NOW()",
)
.bind(actor_url)
.bind(connection_type)
.bind(page as i32)
.bind(&actor.url)
.bind(&actor.handle)
.bind(&actor.display_name)
.bind(&actor.avatar_url)
.execute(&self.pool)
.await
.map_err(|e| DomainError::Internal(e.to_string()))?;
}
Ok(())
}
async fn list_connections(
&self,
actor_url: &str,
connection_type: &str,
page: u32,
) -> Result<Vec<ActorConnectionSummary>, DomainError> {
#[derive(sqlx::FromRow)]
struct Row {
connected_actor_url: String,
connected_handle: String,
connected_display_name: Option<String>,
connected_avatar_url: Option<String>,
}
let rows = sqlx::query_as::<_, Row>(
"SELECT connected_actor_url, connected_handle, connected_display_name, connected_avatar_url
FROM remote_actor_connections
WHERE actor_url = $1 AND connection_type = $2 AND page = $3
ORDER BY connected_handle",
)
.bind(actor_url)
.bind(connection_type)
.bind(page as i32)
.fetch_all(&self.pool)
.await
.map_err(|e| DomainError::Internal(e.to_string()))?;
Ok(rows
.into_iter()
.map(|r| ActorConnectionSummary {
url: r.connected_actor_url,
handle: r.connected_handle,
display_name: r.connected_display_name,
avatar_url: r.connected_avatar_url,
})
.collect())
}
async fn connection_page_age(
&self,
actor_url: &str,
connection_type: &str,
page: u32,
) -> Result<Option<chrono::DateTime<chrono::Utc>>, DomainError> {
let row: Option<(Option<chrono::DateTime<chrono::Utc>>,)> = sqlx::query_as(
"SELECT MAX(fetched_at) FROM remote_actor_connections
WHERE actor_url = $1 AND connection_type = $2 AND page = $3",
)
.bind(actor_url)
.bind(connection_type)
.bind(page as i32)
.fetch_optional(&self.pool)
.await
.map_err(|e| DomainError::Internal(e.to_string()))?;
Ok(row.and_then(|(ts,)| ts))
}
}

View File

@@ -110,3 +110,20 @@ pub struct RemoteActorResponse {
pub following_url: Option<String>,
pub attachment: Vec<ProfileField>,
}
#[derive(Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ActorConnectionResponse {
pub handle: String,
pub display_name: Option<String>,
pub avatar_url: Option<String>,
pub url: String,
}
#[derive(Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ActorConnectionPageResponse {
pub items: Vec<ActorConnectionResponse>,
pub page: u32,
pub has_more: bool,
}

View File

@@ -14,6 +14,7 @@ pub struct FederationEventService {
pub base_url: String,
pub federation_action: Arc<dyn domain::ports::FederationActionPort>,
pub ap_repo: Arc<dyn ActivityPubRepository>,
pub remote_actor_connections: Arc<dyn domain::ports::RemoteActorConnectionRepository>,
}
impl FederationEventService {
@@ -157,6 +158,52 @@ impl FederationEventService {
Ok(())
}
DomainEvent::FetchActorConnections {
actor_ap_url,
collection_url,
connection_type,
page,
} => {
let urls = match self
.federation_action
.fetch_actor_urls_from_collection(collection_url)
.await
{
Ok(u) => u,
Err(e) => {
tracing::warn!(
collection_url,
error = %e,
"failed to fetch actor connections collection"
);
return Ok(());
}
};
if urls.is_empty() {
return Ok(());
}
let summaries = self.federation_action.resolve_actor_profiles(urls).await;
if summaries.is_empty() {
return Ok(());
}
tracing::info!(
count = summaries.len(),
connection_type,
actor = actor_ap_url,
"caching actor connections"
);
self.remote_actor_connections
.upsert_connections(actor_ap_url, connection_type, *page, &summaries)
.await?;
Ok(())
}
_ => Ok(()),
}
}
@@ -255,6 +302,7 @@ mod tests {
base_url: "https://example.com".to_string(),
federation_action: Arc::new(store.clone()),
ap_repo: Arc::new(store.clone()),
remote_actor_connections: Arc::new(store.clone()),
}
}
@@ -592,4 +640,19 @@ mod tests {
.unwrap();
// TestStore.fetch_outbox_page returns Ok(vec![]) — no notes, no error
}
#[tokio::test]
async fn fetch_actor_connections_is_noop_when_collection_empty() {
let store = TestStore::default();
let spy = Arc::new(SpyPort::default());
svc(&store, spy.clone())
.process(&DomainEvent::FetchActorConnections {
actor_ap_url: "https://mastodon.social/users/alice".into(),
collection_url: "https://mastodon.social/users/alice/followers".into(),
connection_type: "followers".into(),
page: 1,
})
.await
.unwrap();
}
}

View File

@@ -8,6 +8,7 @@ use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
use event_transport::EventPublisherAdapter;
use nats::NatsTransport;
use postgres::activitypub::PgActivityPubRepository;
use postgres::remote_actor_connections::PgRemoteActorConnectionRepository;
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
use presentation::state::AppState;
@@ -111,6 +112,7 @@ pub async fn build(cfg: &Config) -> Infrastructure {
events: event_publisher,
federation: ap_service.clone() as Arc<dyn domain::ports::FederationActionPort>,
ap_repo: Arc::new(PgActivityPubRepository::new(pool.clone())),
remote_actor_connections: Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())),
};
Infrastructure { state, ap_service }

View File

@@ -64,6 +64,12 @@ pub enum DomainEvent {
actor_ap_url: String,
outbox_url: String,
},
FetchActorConnections {
actor_ap_url: String,
collection_url: String,
connection_type: String,
page: u32,
},
}
pub struct EventEnvelope {

View File

@@ -0,0 +1,7 @@
#[derive(Debug, Clone)]
pub struct ActorConnectionSummary {
pub url: String,
pub handle: String,
pub display_name: Option<String>,
pub avatar_url: Option<String>,
}

View File

@@ -0,0 +1,14 @@
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConnectionType {
Followers,
Following,
}
impl ConnectionType {
pub fn as_str(&self) -> &'static str {
match self {
Self::Followers => "followers",
Self::Following => "following",
}
}
}

View File

@@ -1,4 +1,6 @@
pub mod actor_connection_summary;
pub mod api_key;
pub mod connection_type;
pub mod feed;
pub mod notification;
pub mod remote_actor;

View File

@@ -194,6 +194,31 @@ pub trait RemoteActorRepository: Send + Sync {
async fn find_by_url(&self, url: &str) -> Result<Option<RemoteActor>, DomainError>;
}
#[async_trait]
pub trait RemoteActorConnectionRepository: Send + Sync {
async fn upsert_connections(
&self,
actor_url: &str,
connection_type: &str,
page: u32,
actors: &[crate::models::actor_connection_summary::ActorConnectionSummary],
) -> Result<(), DomainError>;
async fn list_connections(
&self,
actor_url: &str,
connection_type: &str,
page: u32,
) -> Result<Vec<crate::models::actor_connection_summary::ActorConnectionSummary>, DomainError>;
async fn connection_page_age(
&self,
actor_url: &str,
connection_type: &str,
page: u32,
) -> Result<Option<chrono::DateTime<chrono::Utc>>, DomainError>;
}
#[async_trait]
pub trait FederationActionPort: Send + Sync {
async fn lookup_actor(&self, handle: &str) -> Result<RemoteActor, DomainError>;
@@ -214,6 +239,16 @@ pub trait FederationActionPort: Send + Sync {
outbox_url: &str,
page: u32,
) -> Result<Vec<crate::models::remote_note::RemoteNote>, DomainError>;
async fn fetch_actor_urls_from_collection(
&self,
collection_url: &str,
) -> Result<Vec<String>, DomainError>;
async fn resolve_actor_profiles(
&self,
urls: Vec<String>,
) -> Vec<crate::models::actor_connection_summary::ActorConnectionSummary>;
}
#[async_trait]

View File

@@ -575,6 +575,52 @@ impl FederationActionPort for TestStore {
) -> Result<Vec<crate::models::remote_note::RemoteNote>, DomainError> {
Ok(vec![])
}
async fn fetch_actor_urls_from_collection(
&self,
_collection_url: &str,
) -> Result<Vec<String>, DomainError> {
Ok(vec![])
}
async fn resolve_actor_profiles(
&self,
_urls: Vec<String>,
) -> Vec<crate::models::actor_connection_summary::ActorConnectionSummary> {
vec![]
}
}
#[async_trait]
impl RemoteActorConnectionRepository for TestStore {
async fn upsert_connections(
&self,
_actor_url: &str,
_connection_type: &str,
_page: u32,
_actors: &[crate::models::actor_connection_summary::ActorConnectionSummary],
) -> Result<(), DomainError> {
Ok(())
}
async fn list_connections(
&self,
_actor_url: &str,
_connection_type: &str,
_page: u32,
) -> Result<Vec<crate::models::actor_connection_summary::ActorConnectionSummary>, DomainError>
{
Ok(vec![])
}
async fn connection_page_age(
&self,
_actor_url: &str,
_connection_type: &str,
_page: u32,
) -> Result<Option<chrono::DateTime<chrono::Utc>>, DomainError> {
Ok(None)
}
}
#[async_trait]
@@ -851,6 +897,25 @@ mod federation_port_tests {
.unwrap();
assert!(notes.is_empty());
}
#[tokio::test]
async fn test_store_resolve_actor_profiles_returns_empty() {
let store = TestStore::default();
let result = store
.resolve_actor_profiles(vec!["https://example.com/users/alice".into()])
.await;
assert!(result.is_empty());
}
#[tokio::test]
async fn test_store_fetch_collection_urls_returns_empty() {
let store = TestStore::default();
let urls = store
.fetch_actor_urls_from_collection("https://example.com/users/alice/followers")
.await
.unwrap();
assert!(urls.is_empty());
}
}
#[cfg(test)]

View File

@@ -2,7 +2,10 @@ use crate::{
errors::ApiError, extractors::OptionalAuthUser, handlers::feed::to_thought_response,
state::AppState,
};
use api_types::requests::PaginationQuery;
use api_types::{
requests::PaginationQuery,
responses::{ActorConnectionPageResponse, ActorConnectionResponse},
};
use application::use_cases::feed::get_user_feed;
use axum::{
extract::{Path, Query, State},
@@ -71,6 +74,85 @@ pub async fn remote_actor_posts_handler(
})))
}
const CACHE_TTL_SECS: i64 = 3600;
pub async fn actor_followers_handler(
State(s): State<AppState>,
Path(handle): Path<String>,
Query(q): Query<PaginationQuery>,
) -> Result<Json<ActorConnectionPageResponse>, ApiError> {
actor_connections_handler(s, handle, "followers", q.page() as u32).await
}
pub async fn actor_following_handler(
State(s): State<AppState>,
Path(handle): Path<String>,
Query(q): Query<PaginationQuery>,
) -> Result<Json<ActorConnectionPageResponse>, ApiError> {
actor_connections_handler(s, handle, "following", q.page() as u32).await
}
async fn actor_connections_handler(
s: AppState,
handle: String,
connection_type: &str,
page: u32,
) -> Result<Json<ActorConnectionPageResponse>, ApiError> {
const PAGE_SIZE: usize = 20;
let actor = s.federation.lookup_actor(&handle).await?;
let collection_url = match connection_type {
"followers" => actor
.followers_url
.ok_or_else(|| ApiError::BadRequest("actor has no followers URL".into()))?,
_ => actor
.following_url
.ok_or_else(|| ApiError::BadRequest("actor has no following URL".into()))?,
};
let items = s
.remote_actor_connections
.list_connections(&actor.url, connection_type, page)
.await?;
let stale = match s
.remote_actor_connections
.connection_page_age(&actor.url, connection_type, page)
.await?
{
None => true,
Some(age) => chrono::Utc::now().signed_duration_since(age).num_seconds() > CACHE_TTL_SECS,
};
if stale {
let _ = s
.events
.publish(&DomainEvent::FetchActorConnections {
actor_ap_url: actor.url.clone(),
collection_url,
connection_type: connection_type.to_string(),
page,
})
.await;
}
let has_more = items.len() >= PAGE_SIZE;
Ok(Json(ActorConnectionPageResponse {
items: items
.into_iter()
.map(|a| ActorConnectionResponse {
handle: a.handle,
display_name: a.display_name,
avatar_url: a.avatar_url,
url: a.url,
})
.collect(),
page,
has_more,
}))
}
#[cfg(test)]
mod tests {
use super::*;
@@ -127,6 +209,7 @@ mod tests {
events: store.clone(),
federation: store.clone(),
ap_repo: store.clone(),
remote_actor_connections: store.clone(),
}
}

View File

@@ -113,13 +113,15 @@ mod tests {
hasher: Arc::new(NoOpHasher),
events: store.clone(),
federation: store.clone(),
ap_repo: store.clone(),
remote_actor_connections: store.clone(),
}
}
fn app() -> Router {
Router::new()
.route("/notifications", patch(mark_all_read))
.route("/notifications/:id", patch(mark_notification_read))
.route("/notifications/{id}", patch(mark_notification_read))
.with_state(make_state())
}

View File

@@ -186,6 +186,8 @@ mod tests {
hasher: Arc::new(NoOpHasher),
events: store.clone(),
federation: store.clone(),
ap_repo: store.clone(),
remote_actor_connections: store.clone(),
}
}

View File

@@ -275,6 +275,7 @@ mod tests {
events: store.clone(),
federation: store.clone(),
ap_repo: store.clone(),
remote_actor_connections: store.clone(),
}
}

View File

@@ -69,6 +69,14 @@ pub fn router() -> Router<AppState> {
"/federation/actors/{handle}/posts",
get(federation_actors::remote_actor_posts_handler),
)
.route(
"/federation/actors/{handle}/followers-list",
get(federation_actors::actor_followers_handler),
)
.route(
"/federation/actors/{handle}/following-list",
get(federation_actors::actor_following_handler),
)
.route("/tags/popular", get(feed::get_popular_tags))
.route("/tags/{name}", get(feed::tag_thoughts_handler))
// notifications

View File

@@ -21,4 +21,5 @@ pub struct AppState {
pub events: Arc<dyn EventPublisher>,
pub federation: Arc<dyn FederationActionPort>,
pub ap_repo: Arc<dyn ActivityPubRepository>,
pub remote_actor_connections: Arc<dyn RemoteActorConnectionRepository>,
}

View File

@@ -6,6 +6,7 @@ use activitypub_base::ActivityPubService;
use application::services::{FederationEventService, NotificationEventService};
use domain::ports::{ActivityPubRepository, FederationActionPort, OutboundFederationPort};
use postgres::activitypub::PgActivityPubRepository;
use postgres::remote_actor_connections::PgRemoteActorConnectionRepository;
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
use crate::handlers::{FederationHandler, NotificationHandler};
@@ -59,6 +60,8 @@ pub async fn build(
let ap_federation = ap_service.clone() as Arc<dyn FederationActionPort>;
let ap_repo_worker =
Arc::new(PgActivityPubRepository::new(pool.clone())) as Arc<dyn ActivityPubRepository>;
let actor_connections = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone()))
as Arc<dyn domain::ports::RemoteActorConnectionRepository>;
// Application services
let notification_svc = Arc::new(NotificationEventService {
@@ -72,6 +75,7 @@ pub async fn build(
base_url: base_url.to_string(),
federation_action: ap_federation,
ap_repo: ap_repo_worker,
remote_actor_connections: actor_connections,
});
// Thin handlers

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,213 @@
# Remote Actor Connections (Followers/Following) Design
Display a remote actor's followers and following lists in the thoughts UI, with worker-backed caching and concurrent AP profile resolution.
## Data Flow
1. User opens the Followers or Following tab on a remote actor profile
2. Frontend calls `GET /federation/actors/{handle}/followers-list?page=1`
3. Backend returns cached data immediately (may be empty on first visit)
4. If cache is empty OR older than 1 hour: publish `FetchActorConnections` event fire-and-forget
5. Worker receives event → fetches remote collection page → concurrently resolves each actor URL to a profile → stores results
6. Next visit / tab re-open shows populated data
## Domain Changes
### New models (`domain/src/models/`)
**`connection_type.rs`**:
```rust
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConnectionType {
Followers,
Following,
}
impl ConnectionType {
pub fn as_str(&self) -> &'static str {
match self { Self::Followers => "followers", Self::Following => "following" }
}
}
```
**`actor_connection_summary.rs`**:
```rust
#[derive(Debug, Clone)]
pub struct ActorConnectionSummary {
pub url: String, // AP URL of the connected actor
pub handle: String,
pub display_name: Option<String>,
pub avatar_url: Option<String>,
}
```
### New `DomainEvent` variant (`domain/src/events.rs`)
```rust
FetchActorConnections {
actor_ap_url: String,
collection_url: String,
connection_type: String, // "followers" | "following"
page: u32,
},
```
### New port (`domain/src/ports.rs`)
```rust
pub trait RemoteActorConnectionRepository: Send + Sync {
async fn upsert_connections(
&self,
actor_url: &str,
connection_type: &str,
page: u32,
actors: &[ActorConnectionSummary],
) -> Result<(), DomainError>;
async fn list_connections(
&self,
actor_url: &str,
connection_type: &str,
page: u32,
) -> Result<Vec<ActorConnectionSummary>, DomainError>;
async fn connection_page_age(
&self,
actor_url: &str,
connection_type: &str,
page: u32,
) -> Result<Option<chrono::DateTime<chrono::Utc>>, DomainError>;
}
```
### New `FederationActionPort` method
```rust
async fn resolve_actor_profiles(
&self,
urls: Vec<String>,
) -> Vec<ActorConnectionSummary>;
```
Returns only successful resolutions. Per-actor timeout: 5 seconds. Concurrent. No error propagation — failures are silently skipped (warn logged).
## Storage
### Migration: `006_remote_actor_connections.sql`
```sql
CREATE TABLE remote_actor_connections (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
actor_url TEXT NOT NULL,
connection_type TEXT NOT NULL,
page INT NOT NULL,
connected_actor_url TEXT NOT NULL,
connected_handle TEXT NOT NULL,
connected_display_name TEXT,
connected_avatar_url TEXT,
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(actor_url, connection_type, page, connected_actor_url)
);
CREATE INDEX ON remote_actor_connections(actor_url, connection_type, page, fetched_at);
```
### `PgRemoteActorConnectionRepository`
- `upsert_connections`: `INSERT ... ON CONFLICT DO UPDATE SET connected_handle=EXCLUDED.connected_handle, connected_display_name=EXCLUDED.connected_display_name, connected_avatar_url=EXCLUDED.connected_avatar_url, fetched_at=NOW()`
- `list_connections`: `SELECT * WHERE actor_url=$1 AND connection_type=$2 AND page=$3 ORDER BY connected_handle`
- `connection_page_age`: `SELECT MAX(fetched_at) WHERE actor_url=$1 AND connection_type=$2 AND page=$3`
## activitypub-base: `resolve_actor_profiles`
`ActivityPubService` implements `FederationActionPort::resolve_actor_profiles`:
1. For each URL: spawn `tokio::time::timeout(5s, fetch_actor_profile(url))`
2. `fetch_actor_profile`: `GET {url}` with `Accept: application/activity+json` → parse `preferred_username`, `name`, `icon.url`, `id`
3. Collect `Ok` results → return as `Vec<ActorConnectionSummary>`
4. Failed/timed-out actors: `tracing::warn!` and skip
## event-payload
Add `FetchActorConnections { actor_ap_url, collection_url, connection_type, page }` to `EventPayload` — subject: `"federation.fetch_actor_connections"`. Add to `From<&DomainEvent>`, `TryFrom<EventPayload>`, and uniqueness test.
## Worker
`FederationEventService` gains `remote_actor_connections: Arc<dyn RemoteActorConnectionRepository>`.
Handler for `FetchActorConnections { actor_ap_url, collection_url, connection_type, page }`:
1. Fetch `collection_url` (as AP JSON) → extract `orderedItems` array as Vec of URL strings
2. If empty: return Ok(()) — nothing to store
3. `federation_action.resolve_actor_profiles(urls).await` — concurrent, partial success OK
4. `remote_actor_connections.upsert_connections(actor_ap_url, connection_type, page, &results).await`
5. Log: `tracing::info!(count = results.len(), "actor connections cached")`
Wire `remote_actor_connections` in `worker/src/factory.rs`.
## AppState + Bootstrap
Add `remote_actor_connections: Arc<dyn RemoteActorConnectionRepository>` to `AppState`. Wire `PgRemoteActorConnectionRepository` in `bootstrap/src/factory.rs`.
## REST Endpoints
**`GET /federation/actors/{handle}/followers-list?page=1`**
```
1. lookup_actor(handle) → get actor_ap_url + followers_url
2. list_connections(actor_ap_url, "followers", page) → cached items
3. connection_page_age(...) → if None or > 1 hour: publish FetchActorConnections (fire-and-forget)
4. Return { items: [...], page, has_more: items.len() == PAGE_SIZE }
```
`PAGE_SIZE = 20`. `has_more` tells the frontend whether to show a "next" button.
**`GET /federation/actors/{handle}/following-list?page=1`** — identical, uses `following_url` and `"following"`.
Response item shape (reuses `RemoteActorResponse` minus `bio`/`banner`/`attachment`/`outbox_url`):
```json
{ "handle": "...", "displayName": "...", "avatarUrl": "...", "url": "..." }
```
Define as a new `ActorConnectionResponse` in api-types.
Mount both routes in `routes.rs`. Add new handler file `federation_actors.rs` (already exists — add to it).
## Frontend
### `lib/api.ts`
```typescript
export const ActorConnectionSchema = z.object({
handle: z.string(),
displayName: z.string().nullable(),
avatarUrl: z.string().nullable(),
url: z.string(),
});
export type ActorConnection = z.infer<typeof ActorConnectionSchema>;
const ConnectionPageSchema = z.object({
items: z.array(ActorConnectionSchema),
page: z.number(),
hasMore: z.boolean(),
});
export const getActorFollowers = (handle, page, token) =>
apiFetch(`/federation/actors/${encodeURIComponent(handle)}/followers-list?page=${page}`, {}, ConnectionPageSchema, token);
export const getActorFollowing = (handle, page, token) =>
apiFetch(`/federation/actors/${encodeURIComponent(handle)}/following-list?page=${page}`, {}, ConnectionPageSchema, token);
```
### `RemoteUserProfile` changes
Replace the plain "Followers / Following" link section with two client-side tabs. Each tab:
- Shows a list of `RemoteUserCard` components (reuse existing)
- "Load more" button if `hasMore`
- Empty state: "Loading — check back soon."
- Tab is lazy: only fetches when first opened (not on profile load)
Use the existing `RemoteUserCard` component — it already handles follow button and linking.
### `remote-user-profile.tsx` note
The component is already a client component (`"use client"`), so React state for tab selection and paginated data works fine. Each tab fetches via `getActorFollowers`/`getActorFollowing` when first activated.

View File

@@ -3,14 +3,19 @@
import { useState } from "react";
import { useAuth } from "@/hooks/use-auth";
import Link from "next/link";
import { followUser, RemoteActor } from "@/lib/api";
import { followUser } from "@/lib/api";
import { Button } from "@/components/ui/button";
import { UserAvatar } from "@/components/user-avatar";
import { toast } from "sonner";
import { UserPlus } from "lucide-react";
interface RemoteUserCardProps {
actor: RemoteActor;
actor: {
handle: string;
displayName: string | null;
avatarUrl: string | null;
url: string;
};
}
export function RemoteUserCard({ actor }: RemoteUserCardProps) {

View File

@@ -7,7 +7,9 @@ import { ThoughtList } from "@/components/thought-list";
import { Card } from "@/components/ui/card";
import { Button } from "@/components/ui/button";
import { ExternalLink, UserPlus, UserMinus } from "lucide-react";
import { followUser, unfollowUser, RemoteActor, Thought, Me } from "@/lib/api";
import { followUser, unfollowUser, RemoteActor, Thought, Me, getActorFollowers, getActorFollowing, ActorConnection } from "@/lib/api";
import { RemoteUserCard } from "@/components/remote-user-card";
import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs";
import { toast } from "sonner";
import { useAuth } from "@/hooks/use-auth";
@@ -26,6 +28,17 @@ export function RemoteUserProfile({
const [loading, setLoading] = useState(false);
const { token } = useAuth();
type ConnectionTab = "posts" | "followers" | "following";
const [activeTab, setActiveTab] = useState<ConnectionTab>("posts");
const [followers, setFollowers] = useState<ActorConnection[]>([]);
const [following, setFollowing] = useState<ActorConnection[]>([]);
const [followersPage, setFollowersPage] = useState(1);
const [followingPage, setFollowingPage] = useState(1);
const [followersHasMore, setFollowersHasMore] = useState(false);
const [followingHasMore, setFollowingHasMore] = useState(false);
const [followersLoaded, setFollowersLoaded] = useState(false);
const [followingLoaded, setFollowingLoaded] = useState(false);
const handleFollow = async () => {
if (!token) {
toast.error("You must be logged in to follow users.");
@@ -50,6 +63,30 @@ export function RemoteUserProfile({
}
};
const loadFollowers = async (page: number) => {
const result = await getActorFollowers(actor.handle, page, token).catch(() => null);
if (!result) return;
setFollowers((prev) => (page === 1 ? result.items : [...prev, ...result.items]));
setFollowersHasMore(result.hasMore);
setFollowersLoaded(true);
setFollowersPage(page);
};
const loadFollowing = async (page: number) => {
const result = await getActorFollowing(actor.handle, page, token).catch(() => null);
if (!result) return;
setFollowing((prev) => (page === 1 ? result.items : [...prev, ...result.items]));
setFollowingHasMore(result.hasMore);
setFollowingLoaded(true);
setFollowingPage(page);
};
const handleTabChange = (tab: string) => {
setActiveTab(tab as ConnectionTab);
if (tab === "followers" && !followersLoaded) loadFollowers(1);
if (tab === "following" && !followingLoaded) loadFollowing(1);
};
const isOwnProfile = me?.username === actor.handle;
const authorDetails = new Map<string, { avatarUrl?: string | null }>();
@@ -133,31 +170,6 @@ export function RemoteUserProfile({
</Link>
</Button>
{(actor.followersUrl || actor.followingUrl) && (
<div className="mt-3 flex gap-3 text-sm">
{actor.followersUrl && (
<Link
href={actor.followersUrl}
target="_blank"
rel="noopener noreferrer"
className="text-muted-foreground hover:text-foreground hover:underline"
>
Followers
</Link>
)}
{actor.followingUrl && (
<Link
href={actor.followingUrl}
target="_blank"
rel="noopener noreferrer"
className="text-muted-foreground hover:text-foreground hover:underline"
>
Following
</Link>
)}
</div>
)}
{actor.alsoKnownAs && (
<p className="mt-2 text-xs text-muted-foreground">
Also known as:{" "}
@@ -194,20 +206,86 @@ export function RemoteUserProfile({
</div>
</aside>
<div className="col-span-1 lg:col-span-3 space-y-4">
{initialPosts.length > 0 ? (
<ThoughtList
thoughts={initialPosts}
authorDetails={authorDetails}
currentUser={me}
/>
) : (
<Card className="flex items-center justify-center h-48">
<p className="text-center text-muted-foreground">
Posts are being fetched check back soon.
</p>
</Card>
)}
<div className="col-span-1 lg:col-span-3">
<Tabs defaultValue="posts" onValueChange={handleTabChange}>
<TabsList>
<TabsTrigger value="posts">Posts</TabsTrigger>
<TabsTrigger value="followers">Followers</TabsTrigger>
<TabsTrigger value="following">Following</TabsTrigger>
</TabsList>
<TabsContent value="posts" className="space-y-4 mt-4">
{initialPosts.length > 0 ? (
<ThoughtList
thoughts={initialPosts}
authorDetails={authorDetails}
currentUser={me}
/>
) : (
<Card className="flex items-center justify-center h-48">
<p className="text-center text-muted-foreground">
Posts are being fetched check back soon.
</p>
</Card>
)}
</TabsContent>
<TabsContent value="followers" className="mt-4">
{!followersLoaded ? (
<Card className="flex items-center justify-center h-48">
<p className="text-center text-muted-foreground">Loading followers</p>
</Card>
) : followers.length === 0 ? (
<Card className="flex items-center justify-center h-48">
<p className="text-center text-muted-foreground">
No followers cached yet check back soon.
</p>
</Card>
) : (
<div className="space-y-2">
{followers.map((f) => (
<RemoteUserCard key={f.url} actor={f} />
))}
{followersHasMore && (
<button
onClick={() => loadFollowers(followersPage + 1)}
className="w-full text-sm text-muted-foreground hover:text-foreground py-2"
>
Load more
</button>
)}
</div>
)}
</TabsContent>
<TabsContent value="following" className="mt-4">
{!followingLoaded ? (
<Card className="flex items-center justify-center h-48">
<p className="text-center text-muted-foreground">Loading following</p>
</Card>
) : following.length === 0 ? (
<Card className="flex items-center justify-center h-48">
<p className="text-center text-muted-foreground">
No following cached yet check back soon.
</p>
</Card>
) : (
<div className="space-y-2">
{following.map((f) => (
<RemoteUserCard key={f.url} actor={f} />
))}
{followingHasMore && (
<button
onClick={() => loadFollowing(followingPage + 1)}
className="w-full text-sm text-muted-foreground hover:text-foreground py-2"
>
Load more
</button>
)}
</div>
)}
</TabsContent>
</Tabs>
</div>
</main>
</div>

View File

@@ -270,6 +270,44 @@ export const getRemoteActorPosts = (
token
);
export const ActorConnectionSchema = z.object({
handle: z.string(),
displayName: z.string().nullable(),
avatarUrl: z.string().nullable(),
url: z.string(),
});
export type ActorConnection = z.infer<typeof ActorConnectionSchema>;
const ActorConnectionPageSchema = z.object({
items: z.array(ActorConnectionSchema),
page: z.number(),
hasMore: z.boolean(),
});
export const getActorFollowers = (
handle: string,
page: number,
token: string | null
) =>
apiFetch(
`/federation/actors/${encodeURIComponent(handle)}/followers-list?page=${page}`,
{},
ActorConnectionPageSchema,
token
);
export const getActorFollowing = (
handle: string,
page: number,
token: string | null
) =>
apiFetch(
`/federation/actors/${encodeURIComponent(handle)}/following-list?page=${page}`,
{},
ActorConnectionPageSchema,
token
);
export const getAllUsers = (page: number = 1, pageSize: number = 20) =>
apiFetch(
`/users?page=${page}&per_page=${pageSize}`,