Compare commits
8 Commits
8b3dfffd3b
...
3c6344f954
| Author | SHA1 | Date | |
|---|---|---|---|
| 3c6344f954 | |||
| c536cc2cd4 | |||
| 38a13ad641 | |||
| 58126f195c | |||
| d62dde67bb | |||
| 99dd89b60d | |||
| 23501f5203 | |||
| 75f59a1f40 |
@@ -5,6 +5,7 @@ edition = "2024"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
|
futures = { workspace = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
uuid = { workspace = true }
|
uuid = { workspace = true }
|
||||||
|
|||||||
@@ -1599,6 +1599,85 @@ impl domain::ports::FederationActionPort for ActivityPubService {
|
|||||||
|
|
||||||
Ok(notes)
|
Ok(notes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn fetch_actor_urls_from_collection(
|
||||||
|
&self,
|
||||||
|
collection_url: &str,
|
||||||
|
) -> Result<Vec<String>, domain::errors::DomainError> {
|
||||||
|
let resp: serde_json::Value = reqwest::Client::new()
|
||||||
|
.get(collection_url)
|
||||||
|
.header("Accept", "application/activity+json, application/ld+json")
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?
|
||||||
|
.json()
|
||||||
|
.await
|
||||||
|
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?;
|
||||||
|
|
||||||
|
let empty = vec![];
|
||||||
|
let items = resp["orderedItems"].as_array().unwrap_or(&empty);
|
||||||
|
Ok(items
|
||||||
|
.iter()
|
||||||
|
.filter_map(|v| v.as_str().map(|s| s.to_string()))
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn resolve_actor_profiles(
|
||||||
|
&self,
|
||||||
|
urls: Vec<String>,
|
||||||
|
) -> Vec<domain::models::actor_connection_summary::ActorConnectionSummary> {
|
||||||
|
use futures::future;
|
||||||
|
|
||||||
|
async fn fetch_one(
|
||||||
|
url: String,
|
||||||
|
) -> Option<domain::models::actor_connection_summary::ActorConnectionSummary> {
|
||||||
|
let resp: serde_json::Value = tokio::time::timeout(
|
||||||
|
std::time::Duration::from_secs(5),
|
||||||
|
reqwest::Client::new()
|
||||||
|
.get(&url)
|
||||||
|
.header("Accept", "application/activity+json")
|
||||||
|
.send(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.ok()?
|
||||||
|
.ok()?
|
||||||
|
.json()
|
||||||
|
.await
|
||||||
|
.ok()?;
|
||||||
|
|
||||||
|
let ap_url = resp["id"].as_str()?.to_string();
|
||||||
|
let preferred_username = resp["preferredUsername"].as_str().unwrap_or("").to_string();
|
||||||
|
let domain_str = url::Url::parse(&ap_url)
|
||||||
|
.ok()
|
||||||
|
.and_then(|u| u.host_str().map(|s| s.to_string()))
|
||||||
|
.unwrap_or_default();
|
||||||
|
let handle = format!("{}@{}", preferred_username, domain_str);
|
||||||
|
let display_name = resp["name"].as_str().map(|s| s.to_string());
|
||||||
|
let avatar_url = resp["icon"]["url"].as_str().map(|s| s.to_string());
|
||||||
|
|
||||||
|
Some(
|
||||||
|
domain::models::actor_connection_summary::ActorConnectionSummary {
|
||||||
|
url: ap_url,
|
||||||
|
handle,
|
||||||
|
display_name,
|
||||||
|
avatar_url,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
let futs: Vec<_> = urls.into_iter().map(fetch_one).collect();
|
||||||
|
let results = future::join_all(futs).await;
|
||||||
|
|
||||||
|
results
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|r| {
|
||||||
|
if r.is_none() {
|
||||||
|
tracing::warn!("failed to resolve actor profile (timeout or parse error)");
|
||||||
|
}
|
||||||
|
r
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -4,6 +4,12 @@ where
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn _assert_impl_federation_action_port_connections()
|
||||||
|
where
|
||||||
|
crate::service::ActivityPubService: domain::ports::FederationActionPort,
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::repository::{Follower, FollowerStatus, RemoteActor};
|
use crate::repository::{Follower, FollowerStatus, RemoteActor};
|
||||||
|
|
||||||
|
|||||||
@@ -72,6 +72,12 @@ pub enum EventPayload {
|
|||||||
actor_ap_url: String,
|
actor_ap_url: String,
|
||||||
outbox_url: String,
|
outbox_url: String,
|
||||||
},
|
},
|
||||||
|
FetchActorConnections {
|
||||||
|
actor_ap_url: String,
|
||||||
|
collection_url: String,
|
||||||
|
connection_type: String,
|
||||||
|
page: u32,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EventPayload {
|
impl EventPayload {
|
||||||
@@ -93,6 +99,7 @@ impl EventPayload {
|
|||||||
Self::UserUnblocked { .. } => "users.unblocked",
|
Self::UserUnblocked { .. } => "users.unblocked",
|
||||||
Self::UserRegistered { .. } => "users.registered",
|
Self::UserRegistered { .. } => "users.registered",
|
||||||
Self::FetchRemoteActorPosts { .. } => "federation.fetch_outbox",
|
Self::FetchRemoteActorPosts { .. } => "federation.fetch_outbox",
|
||||||
|
Self::FetchActorConnections { .. } => "federation.fetch_connections",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -209,6 +216,17 @@ impl From<&DomainEvent> for EventPayload {
|
|||||||
actor_ap_url: actor_ap_url.clone(),
|
actor_ap_url: actor_ap_url.clone(),
|
||||||
outbox_url: outbox_url.clone(),
|
outbox_url: outbox_url.clone(),
|
||||||
},
|
},
|
||||||
|
DomainEvent::FetchActorConnections {
|
||||||
|
actor_ap_url,
|
||||||
|
collection_url,
|
||||||
|
connection_type,
|
||||||
|
page,
|
||||||
|
} => Self::FetchActorConnections {
|
||||||
|
actor_ap_url: actor_ap_url.clone(),
|
||||||
|
collection_url: collection_url.clone(),
|
||||||
|
connection_type: connection_type.clone(),
|
||||||
|
page: *page,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -334,6 +352,17 @@ impl TryFrom<EventPayload> for DomainEvent {
|
|||||||
actor_ap_url,
|
actor_ap_url,
|
||||||
outbox_url,
|
outbox_url,
|
||||||
},
|
},
|
||||||
|
EventPayload::FetchActorConnections {
|
||||||
|
actor_ap_url,
|
||||||
|
collection_url,
|
||||||
|
connection_type,
|
||||||
|
page,
|
||||||
|
} => DomainEvent::FetchActorConnections {
|
||||||
|
actor_ap_url,
|
||||||
|
collection_url,
|
||||||
|
connection_type,
|
||||||
|
page,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -419,6 +448,12 @@ mod tests {
|
|||||||
actor_ap_url: "https://mastodon.social/users/alice".into(),
|
actor_ap_url: "https://mastodon.social/users/alice".into(),
|
||||||
outbox_url: "https://mastodon.social/users/alice/outbox".into(),
|
outbox_url: "https://mastodon.social/users/alice/outbox".into(),
|
||||||
},
|
},
|
||||||
|
EventPayload::FetchActorConnections {
|
||||||
|
actor_ap_url: "https://mastodon.social/users/alice".into(),
|
||||||
|
collection_url: "https://mastodon.social/users/alice/followers".into(),
|
||||||
|
connection_type: "followers".into(),
|
||||||
|
page: 1,
|
||||||
|
},
|
||||||
];
|
];
|
||||||
let mut subjects: Vec<&str> = samples.iter().map(|p| p.subject()).collect();
|
let mut subjects: Vec<&str> = samples.iter().map(|p| p.subject()).collect();
|
||||||
subjects.sort();
|
subjects.sort();
|
||||||
|
|||||||
@@ -0,0 +1,13 @@
|
|||||||
|
CREATE TABLE remote_actor_connections (
|
||||||
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||||
|
actor_url TEXT NOT NULL,
|
||||||
|
connection_type TEXT NOT NULL,
|
||||||
|
page INT NOT NULL,
|
||||||
|
connected_actor_url TEXT NOT NULL,
|
||||||
|
connected_handle TEXT NOT NULL,
|
||||||
|
connected_display_name TEXT,
|
||||||
|
connected_avatar_url TEXT,
|
||||||
|
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
UNIQUE(actor_url, connection_type, page, connected_actor_url)
|
||||||
|
);
|
||||||
|
CREATE INDEX ON remote_actor_connections(actor_url, connection_type, page, fetched_at);
|
||||||
@@ -7,6 +7,7 @@ pub mod follow;
|
|||||||
pub mod like;
|
pub mod like;
|
||||||
pub mod notification;
|
pub mod notification;
|
||||||
pub mod remote_actor;
|
pub mod remote_actor;
|
||||||
|
pub mod remote_actor_connections;
|
||||||
pub mod tag;
|
pub mod tag;
|
||||||
pub mod thought;
|
pub mod thought;
|
||||||
pub mod top_friend;
|
pub mod top_friend;
|
||||||
|
|||||||
110
crates/adapters/postgres/src/remote_actor_connections.rs
Normal file
110
crates/adapters/postgres/src/remote_actor_connections.rs
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use domain::{
|
||||||
|
errors::DomainError, models::actor_connection_summary::ActorConnectionSummary,
|
||||||
|
ports::RemoteActorConnectionRepository,
|
||||||
|
};
|
||||||
|
use sqlx::PgPool;
|
||||||
|
|
||||||
|
pub struct PgRemoteActorConnectionRepository {
|
||||||
|
pool: PgPool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PgRemoteActorConnectionRepository {
|
||||||
|
pub fn new(pool: PgPool) -> Self {
|
||||||
|
Self { pool }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RemoteActorConnectionRepository for PgRemoteActorConnectionRepository {
|
||||||
|
async fn upsert_connections(
|
||||||
|
&self,
|
||||||
|
actor_url: &str,
|
||||||
|
connection_type: &str,
|
||||||
|
page: u32,
|
||||||
|
actors: &[ActorConnectionSummary],
|
||||||
|
) -> Result<(), DomainError> {
|
||||||
|
for actor in actors {
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO remote_actor_connections
|
||||||
|
(actor_url, connection_type, page, connected_actor_url,
|
||||||
|
connected_handle, connected_display_name, connected_avatar_url, fetched_at)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
|
||||||
|
ON CONFLICT(actor_url, connection_type, page, connected_actor_url)
|
||||||
|
DO UPDATE SET
|
||||||
|
connected_handle = EXCLUDED.connected_handle,
|
||||||
|
connected_display_name = EXCLUDED.connected_display_name,
|
||||||
|
connected_avatar_url = EXCLUDED.connected_avatar_url,
|
||||||
|
fetched_at = NOW()",
|
||||||
|
)
|
||||||
|
.bind(actor_url)
|
||||||
|
.bind(connection_type)
|
||||||
|
.bind(page as i32)
|
||||||
|
.bind(&actor.url)
|
||||||
|
.bind(&actor.handle)
|
||||||
|
.bind(&actor.display_name)
|
||||||
|
.bind(&actor.avatar_url)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list_connections(
|
||||||
|
&self,
|
||||||
|
actor_url: &str,
|
||||||
|
connection_type: &str,
|
||||||
|
page: u32,
|
||||||
|
) -> Result<Vec<ActorConnectionSummary>, DomainError> {
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
struct Row {
|
||||||
|
connected_actor_url: String,
|
||||||
|
connected_handle: String,
|
||||||
|
connected_display_name: Option<String>,
|
||||||
|
connected_avatar_url: Option<String>,
|
||||||
|
}
|
||||||
|
let rows = sqlx::query_as::<_, Row>(
|
||||||
|
"SELECT connected_actor_url, connected_handle, connected_display_name, connected_avatar_url
|
||||||
|
FROM remote_actor_connections
|
||||||
|
WHERE actor_url = $1 AND connection_type = $2 AND page = $3
|
||||||
|
ORDER BY connected_handle",
|
||||||
|
)
|
||||||
|
.bind(actor_url)
|
||||||
|
.bind(connection_type)
|
||||||
|
.bind(page as i32)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||||
|
|
||||||
|
Ok(rows
|
||||||
|
.into_iter()
|
||||||
|
.map(|r| ActorConnectionSummary {
|
||||||
|
url: r.connected_actor_url,
|
||||||
|
handle: r.connected_handle,
|
||||||
|
display_name: r.connected_display_name,
|
||||||
|
avatar_url: r.connected_avatar_url,
|
||||||
|
})
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connection_page_age(
|
||||||
|
&self,
|
||||||
|
actor_url: &str,
|
||||||
|
connection_type: &str,
|
||||||
|
page: u32,
|
||||||
|
) -> Result<Option<chrono::DateTime<chrono::Utc>>, DomainError> {
|
||||||
|
let row: Option<(Option<chrono::DateTime<chrono::Utc>>,)> = sqlx::query_as(
|
||||||
|
"SELECT MAX(fetched_at) FROM remote_actor_connections
|
||||||
|
WHERE actor_url = $1 AND connection_type = $2 AND page = $3",
|
||||||
|
)
|
||||||
|
.bind(actor_url)
|
||||||
|
.bind(connection_type)
|
||||||
|
.bind(page as i32)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||||
|
|
||||||
|
Ok(row.and_then(|(ts,)| ts))
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -110,3 +110,20 @@ pub struct RemoteActorResponse {
|
|||||||
pub following_url: Option<String>,
|
pub following_url: Option<String>,
|
||||||
pub attachment: Vec<ProfileField>,
|
pub attachment: Vec<ProfileField>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, utoipa::ToSchema)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct ActorConnectionResponse {
|
||||||
|
pub handle: String,
|
||||||
|
pub display_name: Option<String>,
|
||||||
|
pub avatar_url: Option<String>,
|
||||||
|
pub url: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, utoipa::ToSchema)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct ActorConnectionPageResponse {
|
||||||
|
pub items: Vec<ActorConnectionResponse>,
|
||||||
|
pub page: u32,
|
||||||
|
pub has_more: bool,
|
||||||
|
}
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ pub struct FederationEventService {
|
|||||||
pub base_url: String,
|
pub base_url: String,
|
||||||
pub federation_action: Arc<dyn domain::ports::FederationActionPort>,
|
pub federation_action: Arc<dyn domain::ports::FederationActionPort>,
|
||||||
pub ap_repo: Arc<dyn ActivityPubRepository>,
|
pub ap_repo: Arc<dyn ActivityPubRepository>,
|
||||||
|
pub remote_actor_connections: Arc<dyn domain::ports::RemoteActorConnectionRepository>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FederationEventService {
|
impl FederationEventService {
|
||||||
@@ -157,6 +158,52 @@ impl FederationEventService {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DomainEvent::FetchActorConnections {
|
||||||
|
actor_ap_url,
|
||||||
|
collection_url,
|
||||||
|
connection_type,
|
||||||
|
page,
|
||||||
|
} => {
|
||||||
|
let urls = match self
|
||||||
|
.federation_action
|
||||||
|
.fetch_actor_urls_from_collection(collection_url)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(u) => u,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(
|
||||||
|
collection_url,
|
||||||
|
error = %e,
|
||||||
|
"failed to fetch actor connections collection"
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if urls.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let summaries = self.federation_action.resolve_actor_profiles(urls).await;
|
||||||
|
|
||||||
|
if summaries.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
count = summaries.len(),
|
||||||
|
connection_type,
|
||||||
|
actor = actor_ap_url,
|
||||||
|
"caching actor connections"
|
||||||
|
);
|
||||||
|
|
||||||
|
self.remote_actor_connections
|
||||||
|
.upsert_connections(actor_ap_url, connection_type, *page, &summaries)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
_ => Ok(()),
|
_ => Ok(()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -255,6 +302,7 @@ mod tests {
|
|||||||
base_url: "https://example.com".to_string(),
|
base_url: "https://example.com".to_string(),
|
||||||
federation_action: Arc::new(store.clone()),
|
federation_action: Arc::new(store.clone()),
|
||||||
ap_repo: Arc::new(store.clone()),
|
ap_repo: Arc::new(store.clone()),
|
||||||
|
remote_actor_connections: Arc::new(store.clone()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -592,4 +640,19 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
// TestStore.fetch_outbox_page returns Ok(vec![]) — no notes, no error
|
// TestStore.fetch_outbox_page returns Ok(vec![]) — no notes, no error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_actor_connections_is_noop_when_collection_empty() {
|
||||||
|
let store = TestStore::default();
|
||||||
|
let spy = Arc::new(SpyPort::default());
|
||||||
|
svc(&store, spy.clone())
|
||||||
|
.process(&DomainEvent::FetchActorConnections {
|
||||||
|
actor_ap_url: "https://mastodon.social/users/alice".into(),
|
||||||
|
collection_url: "https://mastodon.social/users/alice/followers".into(),
|
||||||
|
connection_type: "followers".into(),
|
||||||
|
page: 1,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
|
|||||||
use event_transport::EventPublisherAdapter;
|
use event_transport::EventPublisherAdapter;
|
||||||
use nats::NatsTransport;
|
use nats::NatsTransport;
|
||||||
use postgres::activitypub::PgActivityPubRepository;
|
use postgres::activitypub::PgActivityPubRepository;
|
||||||
|
use postgres::remote_actor_connections::PgRemoteActorConnectionRepository;
|
||||||
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
|
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
|
||||||
use presentation::state::AppState;
|
use presentation::state::AppState;
|
||||||
|
|
||||||
@@ -111,6 +112,7 @@ pub async fn build(cfg: &Config) -> Infrastructure {
|
|||||||
events: event_publisher,
|
events: event_publisher,
|
||||||
federation: ap_service.clone() as Arc<dyn domain::ports::FederationActionPort>,
|
federation: ap_service.clone() as Arc<dyn domain::ports::FederationActionPort>,
|
||||||
ap_repo: Arc::new(PgActivityPubRepository::new(pool.clone())),
|
ap_repo: Arc::new(PgActivityPubRepository::new(pool.clone())),
|
||||||
|
remote_actor_connections: Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())),
|
||||||
};
|
};
|
||||||
|
|
||||||
Infrastructure { state, ap_service }
|
Infrastructure { state, ap_service }
|
||||||
|
|||||||
@@ -64,6 +64,12 @@ pub enum DomainEvent {
|
|||||||
actor_ap_url: String,
|
actor_ap_url: String,
|
||||||
outbox_url: String,
|
outbox_url: String,
|
||||||
},
|
},
|
||||||
|
FetchActorConnections {
|
||||||
|
actor_ap_url: String,
|
||||||
|
collection_url: String,
|
||||||
|
connection_type: String,
|
||||||
|
page: u32,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct EventEnvelope {
|
pub struct EventEnvelope {
|
||||||
|
|||||||
7
crates/domain/src/models/actor_connection_summary.rs
Normal file
7
crates/domain/src/models/actor_connection_summary.rs
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ActorConnectionSummary {
|
||||||
|
pub url: String,
|
||||||
|
pub handle: String,
|
||||||
|
pub display_name: Option<String>,
|
||||||
|
pub avatar_url: Option<String>,
|
||||||
|
}
|
||||||
14
crates/domain/src/models/connection_type.rs
Normal file
14
crates/domain/src/models/connection_type.rs
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub enum ConnectionType {
|
||||||
|
Followers,
|
||||||
|
Following,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionType {
|
||||||
|
pub fn as_str(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Self::Followers => "followers",
|
||||||
|
Self::Following => "following",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,4 +1,6 @@
|
|||||||
|
pub mod actor_connection_summary;
|
||||||
pub mod api_key;
|
pub mod api_key;
|
||||||
|
pub mod connection_type;
|
||||||
pub mod feed;
|
pub mod feed;
|
||||||
pub mod notification;
|
pub mod notification;
|
||||||
pub mod remote_actor;
|
pub mod remote_actor;
|
||||||
|
|||||||
@@ -194,6 +194,31 @@ pub trait RemoteActorRepository: Send + Sync {
|
|||||||
async fn find_by_url(&self, url: &str) -> Result<Option<RemoteActor>, DomainError>;
|
async fn find_by_url(&self, url: &str) -> Result<Option<RemoteActor>, DomainError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait RemoteActorConnectionRepository: Send + Sync {
|
||||||
|
async fn upsert_connections(
|
||||||
|
&self,
|
||||||
|
actor_url: &str,
|
||||||
|
connection_type: &str,
|
||||||
|
page: u32,
|
||||||
|
actors: &[crate::models::actor_connection_summary::ActorConnectionSummary],
|
||||||
|
) -> Result<(), DomainError>;
|
||||||
|
|
||||||
|
async fn list_connections(
|
||||||
|
&self,
|
||||||
|
actor_url: &str,
|
||||||
|
connection_type: &str,
|
||||||
|
page: u32,
|
||||||
|
) -> Result<Vec<crate::models::actor_connection_summary::ActorConnectionSummary>, DomainError>;
|
||||||
|
|
||||||
|
async fn connection_page_age(
|
||||||
|
&self,
|
||||||
|
actor_url: &str,
|
||||||
|
connection_type: &str,
|
||||||
|
page: u32,
|
||||||
|
) -> Result<Option<chrono::DateTime<chrono::Utc>>, DomainError>;
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait FederationActionPort: Send + Sync {
|
pub trait FederationActionPort: Send + Sync {
|
||||||
async fn lookup_actor(&self, handle: &str) -> Result<RemoteActor, DomainError>;
|
async fn lookup_actor(&self, handle: &str) -> Result<RemoteActor, DomainError>;
|
||||||
@@ -214,6 +239,16 @@ pub trait FederationActionPort: Send + Sync {
|
|||||||
outbox_url: &str,
|
outbox_url: &str,
|
||||||
page: u32,
|
page: u32,
|
||||||
) -> Result<Vec<crate::models::remote_note::RemoteNote>, DomainError>;
|
) -> Result<Vec<crate::models::remote_note::RemoteNote>, DomainError>;
|
||||||
|
|
||||||
|
async fn fetch_actor_urls_from_collection(
|
||||||
|
&self,
|
||||||
|
collection_url: &str,
|
||||||
|
) -> Result<Vec<String>, DomainError>;
|
||||||
|
|
||||||
|
async fn resolve_actor_profiles(
|
||||||
|
&self,
|
||||||
|
urls: Vec<String>,
|
||||||
|
) -> Vec<crate::models::actor_connection_summary::ActorConnectionSummary>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
@@ -575,6 +575,52 @@ impl FederationActionPort for TestStore {
|
|||||||
) -> Result<Vec<crate::models::remote_note::RemoteNote>, DomainError> {
|
) -> Result<Vec<crate::models::remote_note::RemoteNote>, DomainError> {
|
||||||
Ok(vec![])
|
Ok(vec![])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn fetch_actor_urls_from_collection(
|
||||||
|
&self,
|
||||||
|
_collection_url: &str,
|
||||||
|
) -> Result<Vec<String>, DomainError> {
|
||||||
|
Ok(vec![])
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn resolve_actor_profiles(
|
||||||
|
&self,
|
||||||
|
_urls: Vec<String>,
|
||||||
|
) -> Vec<crate::models::actor_connection_summary::ActorConnectionSummary> {
|
||||||
|
vec![]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RemoteActorConnectionRepository for TestStore {
|
||||||
|
async fn upsert_connections(
|
||||||
|
&self,
|
||||||
|
_actor_url: &str,
|
||||||
|
_connection_type: &str,
|
||||||
|
_page: u32,
|
||||||
|
_actors: &[crate::models::actor_connection_summary::ActorConnectionSummary],
|
||||||
|
) -> Result<(), DomainError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list_connections(
|
||||||
|
&self,
|
||||||
|
_actor_url: &str,
|
||||||
|
_connection_type: &str,
|
||||||
|
_page: u32,
|
||||||
|
) -> Result<Vec<crate::models::actor_connection_summary::ActorConnectionSummary>, DomainError>
|
||||||
|
{
|
||||||
|
Ok(vec![])
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connection_page_age(
|
||||||
|
&self,
|
||||||
|
_actor_url: &str,
|
||||||
|
_connection_type: &str,
|
||||||
|
_page: u32,
|
||||||
|
) -> Result<Option<chrono::DateTime<chrono::Utc>>, DomainError> {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -851,6 +897,25 @@ mod federation_port_tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(notes.is_empty());
|
assert!(notes.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_store_resolve_actor_profiles_returns_empty() {
|
||||||
|
let store = TestStore::default();
|
||||||
|
let result = store
|
||||||
|
.resolve_actor_profiles(vec!["https://example.com/users/alice".into()])
|
||||||
|
.await;
|
||||||
|
assert!(result.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_store_fetch_collection_urls_returns_empty() {
|
||||||
|
let store = TestStore::default();
|
||||||
|
let urls = store
|
||||||
|
.fetch_actor_urls_from_collection("https://example.com/users/alice/followers")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(urls.is_empty());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -2,7 +2,10 @@ use crate::{
|
|||||||
errors::ApiError, extractors::OptionalAuthUser, handlers::feed::to_thought_response,
|
errors::ApiError, extractors::OptionalAuthUser, handlers::feed::to_thought_response,
|
||||||
state::AppState,
|
state::AppState,
|
||||||
};
|
};
|
||||||
use api_types::requests::PaginationQuery;
|
use api_types::{
|
||||||
|
requests::PaginationQuery,
|
||||||
|
responses::{ActorConnectionPageResponse, ActorConnectionResponse},
|
||||||
|
};
|
||||||
use application::use_cases::feed::get_user_feed;
|
use application::use_cases::feed::get_user_feed;
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::{Path, Query, State},
|
extract::{Path, Query, State},
|
||||||
@@ -71,6 +74,85 @@ pub async fn remote_actor_posts_handler(
|
|||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const CACHE_TTL_SECS: i64 = 3600;
|
||||||
|
|
||||||
|
pub async fn actor_followers_handler(
|
||||||
|
State(s): State<AppState>,
|
||||||
|
Path(handle): Path<String>,
|
||||||
|
Query(q): Query<PaginationQuery>,
|
||||||
|
) -> Result<Json<ActorConnectionPageResponse>, ApiError> {
|
||||||
|
actor_connections_handler(s, handle, "followers", q.page() as u32).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn actor_following_handler(
|
||||||
|
State(s): State<AppState>,
|
||||||
|
Path(handle): Path<String>,
|
||||||
|
Query(q): Query<PaginationQuery>,
|
||||||
|
) -> Result<Json<ActorConnectionPageResponse>, ApiError> {
|
||||||
|
actor_connections_handler(s, handle, "following", q.page() as u32).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn actor_connections_handler(
|
||||||
|
s: AppState,
|
||||||
|
handle: String,
|
||||||
|
connection_type: &str,
|
||||||
|
page: u32,
|
||||||
|
) -> Result<Json<ActorConnectionPageResponse>, ApiError> {
|
||||||
|
const PAGE_SIZE: usize = 20;
|
||||||
|
|
||||||
|
let actor = s.federation.lookup_actor(&handle).await?;
|
||||||
|
|
||||||
|
let collection_url = match connection_type {
|
||||||
|
"followers" => actor
|
||||||
|
.followers_url
|
||||||
|
.ok_or_else(|| ApiError::BadRequest("actor has no followers URL".into()))?,
|
||||||
|
_ => actor
|
||||||
|
.following_url
|
||||||
|
.ok_or_else(|| ApiError::BadRequest("actor has no following URL".into()))?,
|
||||||
|
};
|
||||||
|
|
||||||
|
let items = s
|
||||||
|
.remote_actor_connections
|
||||||
|
.list_connections(&actor.url, connection_type, page)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let stale = match s
|
||||||
|
.remote_actor_connections
|
||||||
|
.connection_page_age(&actor.url, connection_type, page)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
None => true,
|
||||||
|
Some(age) => chrono::Utc::now().signed_duration_since(age).num_seconds() > CACHE_TTL_SECS,
|
||||||
|
};
|
||||||
|
|
||||||
|
if stale {
|
||||||
|
let _ = s
|
||||||
|
.events
|
||||||
|
.publish(&DomainEvent::FetchActorConnections {
|
||||||
|
actor_ap_url: actor.url.clone(),
|
||||||
|
collection_url,
|
||||||
|
connection_type: connection_type.to_string(),
|
||||||
|
page,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let has_more = items.len() >= PAGE_SIZE;
|
||||||
|
Ok(Json(ActorConnectionPageResponse {
|
||||||
|
items: items
|
||||||
|
.into_iter()
|
||||||
|
.map(|a| ActorConnectionResponse {
|
||||||
|
handle: a.handle,
|
||||||
|
display_name: a.display_name,
|
||||||
|
avatar_url: a.avatar_url,
|
||||||
|
url: a.url,
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
page,
|
||||||
|
has_more,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -127,6 +209,7 @@ mod tests {
|
|||||||
events: store.clone(),
|
events: store.clone(),
|
||||||
federation: store.clone(),
|
federation: store.clone(),
|
||||||
ap_repo: store.clone(),
|
ap_repo: store.clone(),
|
||||||
|
remote_actor_connections: store.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -113,13 +113,15 @@ mod tests {
|
|||||||
hasher: Arc::new(NoOpHasher),
|
hasher: Arc::new(NoOpHasher),
|
||||||
events: store.clone(),
|
events: store.clone(),
|
||||||
federation: store.clone(),
|
federation: store.clone(),
|
||||||
|
ap_repo: store.clone(),
|
||||||
|
remote_actor_connections: store.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn app() -> Router {
|
fn app() -> Router {
|
||||||
Router::new()
|
Router::new()
|
||||||
.route("/notifications", patch(mark_all_read))
|
.route("/notifications", patch(mark_all_read))
|
||||||
.route("/notifications/:id", patch(mark_notification_read))
|
.route("/notifications/{id}", patch(mark_notification_read))
|
||||||
.with_state(make_state())
|
.with_state(make_state())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -186,6 +186,8 @@ mod tests {
|
|||||||
hasher: Arc::new(NoOpHasher),
|
hasher: Arc::new(NoOpHasher),
|
||||||
events: store.clone(),
|
events: store.clone(),
|
||||||
federation: store.clone(),
|
federation: store.clone(),
|
||||||
|
ap_repo: store.clone(),
|
||||||
|
remote_actor_connections: store.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -275,6 +275,7 @@ mod tests {
|
|||||||
events: store.clone(),
|
events: store.clone(),
|
||||||
federation: store.clone(),
|
federation: store.clone(),
|
||||||
ap_repo: store.clone(),
|
ap_repo: store.clone(),
|
||||||
|
remote_actor_connections: store.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -69,6 +69,14 @@ pub fn router() -> Router<AppState> {
|
|||||||
"/federation/actors/{handle}/posts",
|
"/federation/actors/{handle}/posts",
|
||||||
get(federation_actors::remote_actor_posts_handler),
|
get(federation_actors::remote_actor_posts_handler),
|
||||||
)
|
)
|
||||||
|
.route(
|
||||||
|
"/federation/actors/{handle}/followers-list",
|
||||||
|
get(federation_actors::actor_followers_handler),
|
||||||
|
)
|
||||||
|
.route(
|
||||||
|
"/federation/actors/{handle}/following-list",
|
||||||
|
get(federation_actors::actor_following_handler),
|
||||||
|
)
|
||||||
.route("/tags/popular", get(feed::get_popular_tags))
|
.route("/tags/popular", get(feed::get_popular_tags))
|
||||||
.route("/tags/{name}", get(feed::tag_thoughts_handler))
|
.route("/tags/{name}", get(feed::tag_thoughts_handler))
|
||||||
// notifications
|
// notifications
|
||||||
|
|||||||
@@ -21,4 +21,5 @@ pub struct AppState {
|
|||||||
pub events: Arc<dyn EventPublisher>,
|
pub events: Arc<dyn EventPublisher>,
|
||||||
pub federation: Arc<dyn FederationActionPort>,
|
pub federation: Arc<dyn FederationActionPort>,
|
||||||
pub ap_repo: Arc<dyn ActivityPubRepository>,
|
pub ap_repo: Arc<dyn ActivityPubRepository>,
|
||||||
|
pub remote_actor_connections: Arc<dyn RemoteActorConnectionRepository>,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ use activitypub_base::ActivityPubService;
|
|||||||
use application::services::{FederationEventService, NotificationEventService};
|
use application::services::{FederationEventService, NotificationEventService};
|
||||||
use domain::ports::{ActivityPubRepository, FederationActionPort, OutboundFederationPort};
|
use domain::ports::{ActivityPubRepository, FederationActionPort, OutboundFederationPort};
|
||||||
use postgres::activitypub::PgActivityPubRepository;
|
use postgres::activitypub::PgActivityPubRepository;
|
||||||
|
use postgres::remote_actor_connections::PgRemoteActorConnectionRepository;
|
||||||
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
|
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
|
||||||
|
|
||||||
use crate::handlers::{FederationHandler, NotificationHandler};
|
use crate::handlers::{FederationHandler, NotificationHandler};
|
||||||
@@ -59,6 +60,8 @@ pub async fn build(
|
|||||||
let ap_federation = ap_service.clone() as Arc<dyn FederationActionPort>;
|
let ap_federation = ap_service.clone() as Arc<dyn FederationActionPort>;
|
||||||
let ap_repo_worker =
|
let ap_repo_worker =
|
||||||
Arc::new(PgActivityPubRepository::new(pool.clone())) as Arc<dyn ActivityPubRepository>;
|
Arc::new(PgActivityPubRepository::new(pool.clone())) as Arc<dyn ActivityPubRepository>;
|
||||||
|
let actor_connections = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone()))
|
||||||
|
as Arc<dyn domain::ports::RemoteActorConnectionRepository>;
|
||||||
|
|
||||||
// Application services
|
// Application services
|
||||||
let notification_svc = Arc::new(NotificationEventService {
|
let notification_svc = Arc::new(NotificationEventService {
|
||||||
@@ -72,6 +75,7 @@ pub async fn build(
|
|||||||
base_url: base_url.to_string(),
|
base_url: base_url.to_string(),
|
||||||
federation_action: ap_federation,
|
federation_action: ap_federation,
|
||||||
ap_repo: ap_repo_worker,
|
ap_repo: ap_repo_worker,
|
||||||
|
remote_actor_connections: actor_connections,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Thin handlers
|
// Thin handlers
|
||||||
|
|||||||
1205
docs/superpowers/plans/2026-05-15-actor-connections.md
Normal file
1205
docs/superpowers/plans/2026-05-15-actor-connections.md
Normal file
File diff suppressed because it is too large
Load Diff
213
docs/superpowers/specs/2026-05-15-actor-connections-design.md
Normal file
213
docs/superpowers/specs/2026-05-15-actor-connections-design.md
Normal file
@@ -0,0 +1,213 @@
|
|||||||
|
# Remote Actor Connections (Followers/Following) Design
|
||||||
|
|
||||||
|
Display a remote actor's followers and following lists in the thoughts UI, with worker-backed caching and concurrent AP profile resolution.
|
||||||
|
|
||||||
|
## Data Flow
|
||||||
|
|
||||||
|
1. User opens the Followers or Following tab on a remote actor profile
|
||||||
|
2. Frontend calls `GET /federation/actors/{handle}/followers-list?page=1`
|
||||||
|
3. Backend returns cached data immediately (may be empty on first visit)
|
||||||
|
4. If cache is empty OR older than 1 hour: publish `FetchActorConnections` event fire-and-forget
|
||||||
|
5. Worker receives event → fetches remote collection page → concurrently resolves each actor URL to a profile → stores results
|
||||||
|
6. Next visit / tab re-open shows populated data
|
||||||
|
|
||||||
|
## Domain Changes
|
||||||
|
|
||||||
|
### New models (`domain/src/models/`)
|
||||||
|
|
||||||
|
**`connection_type.rs`**:
|
||||||
|
```rust
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub enum ConnectionType {
|
||||||
|
Followers,
|
||||||
|
Following,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionType {
|
||||||
|
pub fn as_str(&self) -> &'static str {
|
||||||
|
match self { Self::Followers => "followers", Self::Following => "following" }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**`actor_connection_summary.rs`**:
|
||||||
|
```rust
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ActorConnectionSummary {
|
||||||
|
pub url: String, // AP URL of the connected actor
|
||||||
|
pub handle: String,
|
||||||
|
pub display_name: Option<String>,
|
||||||
|
pub avatar_url: Option<String>,
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### New `DomainEvent` variant (`domain/src/events.rs`)
|
||||||
|
|
||||||
|
```rust
|
||||||
|
FetchActorConnections {
|
||||||
|
actor_ap_url: String,
|
||||||
|
collection_url: String,
|
||||||
|
connection_type: String, // "followers" | "following"
|
||||||
|
page: u32,
|
||||||
|
},
|
||||||
|
```
|
||||||
|
|
||||||
|
### New port (`domain/src/ports.rs`)
|
||||||
|
|
||||||
|
```rust
|
||||||
|
pub trait RemoteActorConnectionRepository: Send + Sync {
|
||||||
|
async fn upsert_connections(
|
||||||
|
&self,
|
||||||
|
actor_url: &str,
|
||||||
|
connection_type: &str,
|
||||||
|
page: u32,
|
||||||
|
actors: &[ActorConnectionSummary],
|
||||||
|
) -> Result<(), DomainError>;
|
||||||
|
|
||||||
|
async fn list_connections(
|
||||||
|
&self,
|
||||||
|
actor_url: &str,
|
||||||
|
connection_type: &str,
|
||||||
|
page: u32,
|
||||||
|
) -> Result<Vec<ActorConnectionSummary>, DomainError>;
|
||||||
|
|
||||||
|
async fn connection_page_age(
|
||||||
|
&self,
|
||||||
|
actor_url: &str,
|
||||||
|
connection_type: &str,
|
||||||
|
page: u32,
|
||||||
|
) -> Result<Option<chrono::DateTime<chrono::Utc>>, DomainError>;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### New `FederationActionPort` method
|
||||||
|
|
||||||
|
```rust
|
||||||
|
async fn resolve_actor_profiles(
|
||||||
|
&self,
|
||||||
|
urls: Vec<String>,
|
||||||
|
) -> Vec<ActorConnectionSummary>;
|
||||||
|
```
|
||||||
|
|
||||||
|
Returns only successful resolutions. Per-actor timeout: 5 seconds. Concurrent. No error propagation — failures are silently skipped (warn logged).
|
||||||
|
|
||||||
|
## Storage
|
||||||
|
|
||||||
|
### Migration: `006_remote_actor_connections.sql`
|
||||||
|
|
||||||
|
```sql
|
||||||
|
CREATE TABLE remote_actor_connections (
|
||||||
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||||
|
actor_url TEXT NOT NULL,
|
||||||
|
connection_type TEXT NOT NULL,
|
||||||
|
page INT NOT NULL,
|
||||||
|
connected_actor_url TEXT NOT NULL,
|
||||||
|
connected_handle TEXT NOT NULL,
|
||||||
|
connected_display_name TEXT,
|
||||||
|
connected_avatar_url TEXT,
|
||||||
|
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
UNIQUE(actor_url, connection_type, page, connected_actor_url)
|
||||||
|
);
|
||||||
|
CREATE INDEX ON remote_actor_connections(actor_url, connection_type, page, fetched_at);
|
||||||
|
```
|
||||||
|
|
||||||
|
### `PgRemoteActorConnectionRepository`
|
||||||
|
|
||||||
|
- `upsert_connections`: `INSERT ... ON CONFLICT DO UPDATE SET connected_handle=EXCLUDED.connected_handle, connected_display_name=EXCLUDED.connected_display_name, connected_avatar_url=EXCLUDED.connected_avatar_url, fetched_at=NOW()`
|
||||||
|
- `list_connections`: `SELECT * WHERE actor_url=$1 AND connection_type=$2 AND page=$3 ORDER BY connected_handle`
|
||||||
|
- `connection_page_age`: `SELECT MAX(fetched_at) WHERE actor_url=$1 AND connection_type=$2 AND page=$3`
|
||||||
|
|
||||||
|
## activitypub-base: `resolve_actor_profiles`
|
||||||
|
|
||||||
|
`ActivityPubService` implements `FederationActionPort::resolve_actor_profiles`:
|
||||||
|
|
||||||
|
1. For each URL: spawn `tokio::time::timeout(5s, fetch_actor_profile(url))`
|
||||||
|
2. `fetch_actor_profile`: `GET {url}` with `Accept: application/activity+json` → parse `preferred_username`, `name`, `icon.url`, `id`
|
||||||
|
3. Collect `Ok` results → return as `Vec<ActorConnectionSummary>`
|
||||||
|
4. Failed/timed-out actors: `tracing::warn!` and skip
|
||||||
|
|
||||||
|
## event-payload
|
||||||
|
|
||||||
|
Add `FetchActorConnections { actor_ap_url, collection_url, connection_type, page }` to `EventPayload` — subject: `"federation.fetch_actor_connections"`. Add to `From<&DomainEvent>`, `TryFrom<EventPayload>`, and uniqueness test.
|
||||||
|
|
||||||
|
## Worker
|
||||||
|
|
||||||
|
`FederationEventService` gains `remote_actor_connections: Arc<dyn RemoteActorConnectionRepository>`.
|
||||||
|
|
||||||
|
Handler for `FetchActorConnections { actor_ap_url, collection_url, connection_type, page }`:
|
||||||
|
|
||||||
|
1. Fetch `collection_url` (as AP JSON) → extract `orderedItems` array as Vec of URL strings
|
||||||
|
2. If empty: return Ok(()) — nothing to store
|
||||||
|
3. `federation_action.resolve_actor_profiles(urls).await` — concurrent, partial success OK
|
||||||
|
4. `remote_actor_connections.upsert_connections(actor_ap_url, connection_type, page, &results).await`
|
||||||
|
5. Log: `tracing::info!(count = results.len(), "actor connections cached")`
|
||||||
|
|
||||||
|
Wire `remote_actor_connections` in `worker/src/factory.rs`.
|
||||||
|
|
||||||
|
## AppState + Bootstrap
|
||||||
|
|
||||||
|
Add `remote_actor_connections: Arc<dyn RemoteActorConnectionRepository>` to `AppState`. Wire `PgRemoteActorConnectionRepository` in `bootstrap/src/factory.rs`.
|
||||||
|
|
||||||
|
## REST Endpoints
|
||||||
|
|
||||||
|
**`GET /federation/actors/{handle}/followers-list?page=1`**
|
||||||
|
|
||||||
|
```
|
||||||
|
1. lookup_actor(handle) → get actor_ap_url + followers_url
|
||||||
|
2. list_connections(actor_ap_url, "followers", page) → cached items
|
||||||
|
3. connection_page_age(...) → if None or > 1 hour: publish FetchActorConnections (fire-and-forget)
|
||||||
|
4. Return { items: [...], page, has_more: items.len() == PAGE_SIZE }
|
||||||
|
```
|
||||||
|
|
||||||
|
`PAGE_SIZE = 20`. `has_more` tells the frontend whether to show a "next" button.
|
||||||
|
|
||||||
|
**`GET /federation/actors/{handle}/following-list?page=1`** — identical, uses `following_url` and `"following"`.
|
||||||
|
|
||||||
|
Response item shape (reuses `RemoteActorResponse` minus `bio`/`banner`/`attachment`/`outbox_url`):
|
||||||
|
```json
|
||||||
|
{ "handle": "...", "displayName": "...", "avatarUrl": "...", "url": "..." }
|
||||||
|
```
|
||||||
|
|
||||||
|
Define as a new `ActorConnectionResponse` in api-types.
|
||||||
|
|
||||||
|
Mount both routes in `routes.rs`. Add new handler file `federation_actors.rs` (already exists — add to it).
|
||||||
|
|
||||||
|
## Frontend
|
||||||
|
|
||||||
|
### `lib/api.ts`
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
export const ActorConnectionSchema = z.object({
|
||||||
|
handle: z.string(),
|
||||||
|
displayName: z.string().nullable(),
|
||||||
|
avatarUrl: z.string().nullable(),
|
||||||
|
url: z.string(),
|
||||||
|
});
|
||||||
|
export type ActorConnection = z.infer<typeof ActorConnectionSchema>;
|
||||||
|
|
||||||
|
const ConnectionPageSchema = z.object({
|
||||||
|
items: z.array(ActorConnectionSchema),
|
||||||
|
page: z.number(),
|
||||||
|
hasMore: z.boolean(),
|
||||||
|
});
|
||||||
|
|
||||||
|
export const getActorFollowers = (handle, page, token) =>
|
||||||
|
apiFetch(`/federation/actors/${encodeURIComponent(handle)}/followers-list?page=${page}`, {}, ConnectionPageSchema, token);
|
||||||
|
|
||||||
|
export const getActorFollowing = (handle, page, token) =>
|
||||||
|
apiFetch(`/federation/actors/${encodeURIComponent(handle)}/following-list?page=${page}`, {}, ConnectionPageSchema, token);
|
||||||
|
```
|
||||||
|
|
||||||
|
### `RemoteUserProfile` changes
|
||||||
|
|
||||||
|
Replace the plain "Followers / Following" link section with two client-side tabs. Each tab:
|
||||||
|
- Shows a list of `RemoteUserCard` components (reuse existing)
|
||||||
|
- "Load more" button if `hasMore`
|
||||||
|
- Empty state: "Loading — check back soon."
|
||||||
|
- Tab is lazy: only fetches when first opened (not on profile load)
|
||||||
|
|
||||||
|
Use the existing `RemoteUserCard` component — it already handles follow button and linking.
|
||||||
|
|
||||||
|
### `remote-user-profile.tsx` note
|
||||||
|
|
||||||
|
The component is already a client component (`"use client"`), so React state for tab selection and paginated data works fine. Each tab fetches via `getActorFollowers`/`getActorFollowing` when first activated.
|
||||||
@@ -3,14 +3,19 @@
|
|||||||
import { useState } from "react";
|
import { useState } from "react";
|
||||||
import { useAuth } from "@/hooks/use-auth";
|
import { useAuth } from "@/hooks/use-auth";
|
||||||
import Link from "next/link";
|
import Link from "next/link";
|
||||||
import { followUser, RemoteActor } from "@/lib/api";
|
import { followUser } from "@/lib/api";
|
||||||
import { Button } from "@/components/ui/button";
|
import { Button } from "@/components/ui/button";
|
||||||
import { UserAvatar } from "@/components/user-avatar";
|
import { UserAvatar } from "@/components/user-avatar";
|
||||||
import { toast } from "sonner";
|
import { toast } from "sonner";
|
||||||
import { UserPlus } from "lucide-react";
|
import { UserPlus } from "lucide-react";
|
||||||
|
|
||||||
interface RemoteUserCardProps {
|
interface RemoteUserCardProps {
|
||||||
actor: RemoteActor;
|
actor: {
|
||||||
|
handle: string;
|
||||||
|
displayName: string | null;
|
||||||
|
avatarUrl: string | null;
|
||||||
|
url: string;
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export function RemoteUserCard({ actor }: RemoteUserCardProps) {
|
export function RemoteUserCard({ actor }: RemoteUserCardProps) {
|
||||||
|
|||||||
@@ -7,7 +7,9 @@ import { ThoughtList } from "@/components/thought-list";
|
|||||||
import { Card } from "@/components/ui/card";
|
import { Card } from "@/components/ui/card";
|
||||||
import { Button } from "@/components/ui/button";
|
import { Button } from "@/components/ui/button";
|
||||||
import { ExternalLink, UserPlus, UserMinus } from "lucide-react";
|
import { ExternalLink, UserPlus, UserMinus } from "lucide-react";
|
||||||
import { followUser, unfollowUser, RemoteActor, Thought, Me } from "@/lib/api";
|
import { followUser, unfollowUser, RemoteActor, Thought, Me, getActorFollowers, getActorFollowing, ActorConnection } from "@/lib/api";
|
||||||
|
import { RemoteUserCard } from "@/components/remote-user-card";
|
||||||
|
import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs";
|
||||||
import { toast } from "sonner";
|
import { toast } from "sonner";
|
||||||
import { useAuth } from "@/hooks/use-auth";
|
import { useAuth } from "@/hooks/use-auth";
|
||||||
|
|
||||||
@@ -26,6 +28,17 @@ export function RemoteUserProfile({
|
|||||||
const [loading, setLoading] = useState(false);
|
const [loading, setLoading] = useState(false);
|
||||||
const { token } = useAuth();
|
const { token } = useAuth();
|
||||||
|
|
||||||
|
type ConnectionTab = "posts" | "followers" | "following";
|
||||||
|
const [activeTab, setActiveTab] = useState<ConnectionTab>("posts");
|
||||||
|
const [followers, setFollowers] = useState<ActorConnection[]>([]);
|
||||||
|
const [following, setFollowing] = useState<ActorConnection[]>([]);
|
||||||
|
const [followersPage, setFollowersPage] = useState(1);
|
||||||
|
const [followingPage, setFollowingPage] = useState(1);
|
||||||
|
const [followersHasMore, setFollowersHasMore] = useState(false);
|
||||||
|
const [followingHasMore, setFollowingHasMore] = useState(false);
|
||||||
|
const [followersLoaded, setFollowersLoaded] = useState(false);
|
||||||
|
const [followingLoaded, setFollowingLoaded] = useState(false);
|
||||||
|
|
||||||
const handleFollow = async () => {
|
const handleFollow = async () => {
|
||||||
if (!token) {
|
if (!token) {
|
||||||
toast.error("You must be logged in to follow users.");
|
toast.error("You must be logged in to follow users.");
|
||||||
@@ -50,6 +63,30 @@ export function RemoteUserProfile({
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const loadFollowers = async (page: number) => {
|
||||||
|
const result = await getActorFollowers(actor.handle, page, token).catch(() => null);
|
||||||
|
if (!result) return;
|
||||||
|
setFollowers((prev) => (page === 1 ? result.items : [...prev, ...result.items]));
|
||||||
|
setFollowersHasMore(result.hasMore);
|
||||||
|
setFollowersLoaded(true);
|
||||||
|
setFollowersPage(page);
|
||||||
|
};
|
||||||
|
|
||||||
|
const loadFollowing = async (page: number) => {
|
||||||
|
const result = await getActorFollowing(actor.handle, page, token).catch(() => null);
|
||||||
|
if (!result) return;
|
||||||
|
setFollowing((prev) => (page === 1 ? result.items : [...prev, ...result.items]));
|
||||||
|
setFollowingHasMore(result.hasMore);
|
||||||
|
setFollowingLoaded(true);
|
||||||
|
setFollowingPage(page);
|
||||||
|
};
|
||||||
|
|
||||||
|
const handleTabChange = (tab: string) => {
|
||||||
|
setActiveTab(tab as ConnectionTab);
|
||||||
|
if (tab === "followers" && !followersLoaded) loadFollowers(1);
|
||||||
|
if (tab === "following" && !followingLoaded) loadFollowing(1);
|
||||||
|
};
|
||||||
|
|
||||||
const isOwnProfile = me?.username === actor.handle;
|
const isOwnProfile = me?.username === actor.handle;
|
||||||
|
|
||||||
const authorDetails = new Map<string, { avatarUrl?: string | null }>();
|
const authorDetails = new Map<string, { avatarUrl?: string | null }>();
|
||||||
@@ -133,31 +170,6 @@ export function RemoteUserProfile({
|
|||||||
</Link>
|
</Link>
|
||||||
</Button>
|
</Button>
|
||||||
|
|
||||||
{(actor.followersUrl || actor.followingUrl) && (
|
|
||||||
<div className="mt-3 flex gap-3 text-sm">
|
|
||||||
{actor.followersUrl && (
|
|
||||||
<Link
|
|
||||||
href={actor.followersUrl}
|
|
||||||
target="_blank"
|
|
||||||
rel="noopener noreferrer"
|
|
||||||
className="text-muted-foreground hover:text-foreground hover:underline"
|
|
||||||
>
|
|
||||||
Followers
|
|
||||||
</Link>
|
|
||||||
)}
|
|
||||||
{actor.followingUrl && (
|
|
||||||
<Link
|
|
||||||
href={actor.followingUrl}
|
|
||||||
target="_blank"
|
|
||||||
rel="noopener noreferrer"
|
|
||||||
className="text-muted-foreground hover:text-foreground hover:underline"
|
|
||||||
>
|
|
||||||
Following
|
|
||||||
</Link>
|
|
||||||
)}
|
|
||||||
</div>
|
|
||||||
)}
|
|
||||||
|
|
||||||
{actor.alsoKnownAs && (
|
{actor.alsoKnownAs && (
|
||||||
<p className="mt-2 text-xs text-muted-foreground">
|
<p className="mt-2 text-xs text-muted-foreground">
|
||||||
Also known as:{" "}
|
Also known as:{" "}
|
||||||
@@ -194,7 +206,15 @@ export function RemoteUserProfile({
|
|||||||
</div>
|
</div>
|
||||||
</aside>
|
</aside>
|
||||||
|
|
||||||
<div className="col-span-1 lg:col-span-3 space-y-4">
|
<div className="col-span-1 lg:col-span-3">
|
||||||
|
<Tabs defaultValue="posts" onValueChange={handleTabChange}>
|
||||||
|
<TabsList>
|
||||||
|
<TabsTrigger value="posts">Posts</TabsTrigger>
|
||||||
|
<TabsTrigger value="followers">Followers</TabsTrigger>
|
||||||
|
<TabsTrigger value="following">Following</TabsTrigger>
|
||||||
|
</TabsList>
|
||||||
|
|
||||||
|
<TabsContent value="posts" className="space-y-4 mt-4">
|
||||||
{initialPosts.length > 0 ? (
|
{initialPosts.length > 0 ? (
|
||||||
<ThoughtList
|
<ThoughtList
|
||||||
thoughts={initialPosts}
|
thoughts={initialPosts}
|
||||||
@@ -208,6 +228,64 @@ export function RemoteUserProfile({
|
|||||||
</p>
|
</p>
|
||||||
</Card>
|
</Card>
|
||||||
)}
|
)}
|
||||||
|
</TabsContent>
|
||||||
|
|
||||||
|
<TabsContent value="followers" className="mt-4">
|
||||||
|
{!followersLoaded ? (
|
||||||
|
<Card className="flex items-center justify-center h-48">
|
||||||
|
<p className="text-center text-muted-foreground">Loading followers…</p>
|
||||||
|
</Card>
|
||||||
|
) : followers.length === 0 ? (
|
||||||
|
<Card className="flex items-center justify-center h-48">
|
||||||
|
<p className="text-center text-muted-foreground">
|
||||||
|
No followers cached yet — check back soon.
|
||||||
|
</p>
|
||||||
|
</Card>
|
||||||
|
) : (
|
||||||
|
<div className="space-y-2">
|
||||||
|
{followers.map((f) => (
|
||||||
|
<RemoteUserCard key={f.url} actor={f} />
|
||||||
|
))}
|
||||||
|
{followersHasMore && (
|
||||||
|
<button
|
||||||
|
onClick={() => loadFollowers(followersPage + 1)}
|
||||||
|
className="w-full text-sm text-muted-foreground hover:text-foreground py-2"
|
||||||
|
>
|
||||||
|
Load more
|
||||||
|
</button>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
</TabsContent>
|
||||||
|
|
||||||
|
<TabsContent value="following" className="mt-4">
|
||||||
|
{!followingLoaded ? (
|
||||||
|
<Card className="flex items-center justify-center h-48">
|
||||||
|
<p className="text-center text-muted-foreground">Loading following…</p>
|
||||||
|
</Card>
|
||||||
|
) : following.length === 0 ? (
|
||||||
|
<Card className="flex items-center justify-center h-48">
|
||||||
|
<p className="text-center text-muted-foreground">
|
||||||
|
No following cached yet — check back soon.
|
||||||
|
</p>
|
||||||
|
</Card>
|
||||||
|
) : (
|
||||||
|
<div className="space-y-2">
|
||||||
|
{following.map((f) => (
|
||||||
|
<RemoteUserCard key={f.url} actor={f} />
|
||||||
|
))}
|
||||||
|
{followingHasMore && (
|
||||||
|
<button
|
||||||
|
onClick={() => loadFollowing(followingPage + 1)}
|
||||||
|
className="w-full text-sm text-muted-foreground hover:text-foreground py-2"
|
||||||
|
>
|
||||||
|
Load more
|
||||||
|
</button>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
</TabsContent>
|
||||||
|
</Tabs>
|
||||||
</div>
|
</div>
|
||||||
</main>
|
</main>
|
||||||
</div>
|
</div>
|
||||||
|
|||||||
@@ -270,6 +270,44 @@ export const getRemoteActorPosts = (
|
|||||||
token
|
token
|
||||||
);
|
);
|
||||||
|
|
||||||
|
export const ActorConnectionSchema = z.object({
|
||||||
|
handle: z.string(),
|
||||||
|
displayName: z.string().nullable(),
|
||||||
|
avatarUrl: z.string().nullable(),
|
||||||
|
url: z.string(),
|
||||||
|
});
|
||||||
|
export type ActorConnection = z.infer<typeof ActorConnectionSchema>;
|
||||||
|
|
||||||
|
const ActorConnectionPageSchema = z.object({
|
||||||
|
items: z.array(ActorConnectionSchema),
|
||||||
|
page: z.number(),
|
||||||
|
hasMore: z.boolean(),
|
||||||
|
});
|
||||||
|
|
||||||
|
export const getActorFollowers = (
|
||||||
|
handle: string,
|
||||||
|
page: number,
|
||||||
|
token: string | null
|
||||||
|
) =>
|
||||||
|
apiFetch(
|
||||||
|
`/federation/actors/${encodeURIComponent(handle)}/followers-list?page=${page}`,
|
||||||
|
{},
|
||||||
|
ActorConnectionPageSchema,
|
||||||
|
token
|
||||||
|
);
|
||||||
|
|
||||||
|
export const getActorFollowing = (
|
||||||
|
handle: string,
|
||||||
|
page: number,
|
||||||
|
token: string | null
|
||||||
|
) =>
|
||||||
|
apiFetch(
|
||||||
|
`/federation/actors/${encodeURIComponent(handle)}/following-list?page=${page}`,
|
||||||
|
{},
|
||||||
|
ActorConnectionPageSchema,
|
||||||
|
token
|
||||||
|
);
|
||||||
|
|
||||||
export const getAllUsers = (page: number = 1, pageSize: number = 20) =>
|
export const getAllUsers = (page: number = 1, pageSize: number = 20) =>
|
||||||
apiFetch(
|
apiFetch(
|
||||||
`/users?page=${page}&per_page=${pageSize}`,
|
`/users?page=${page}&per_page=${pageSize}`,
|
||||||
|
|||||||
Reference in New Issue
Block a user