Compare commits
8 Commits
32bfb00970
...
b58c96b843
| Author | SHA1 | Date | |
|---|---|---|---|
| b58c96b843 | |||
| 8ea24461ba | |||
| e14a9f90c8 | |||
| 28756ef4cd | |||
| 7f27ae49c3 | |||
| 59f3423c00 | |||
| c48aa33592 | |||
| 8f3aa4b891 |
@@ -1,5 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use domain::ports::FederationFetchPort;
|
||||
|
||||
use activitypub_federation::{
|
||||
activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext,
|
||||
traits::Actor,
|
||||
@@ -154,9 +156,11 @@ pub(crate) async fn send_with_retry(
|
||||
failures
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ActivityPubService {
|
||||
federation_config: ApFederationConfig,
|
||||
base_url: String,
|
||||
connections_repo: Arc<dyn domain::ports::RemoteActorConnectionRepository>,
|
||||
}
|
||||
|
||||
impl ActivityPubService {
|
||||
@@ -170,6 +174,7 @@ impl ActivityPubService {
|
||||
software_name: String,
|
||||
debug: bool,
|
||||
event_publisher: Option<Arc<dyn domain::ports::EventPublisher>>,
|
||||
connections_repo: Arc<dyn domain::ports::RemoteActorConnectionRepository>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let data = FederationData::new(
|
||||
repo,
|
||||
@@ -184,6 +189,7 @@ impl ActivityPubService {
|
||||
Ok(Self {
|
||||
federation_config,
|
||||
base_url,
|
||||
connections_repo,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1586,11 +1592,14 @@ impl domain::ports::FederationSchedulerPort for ActivityPubService {
|
||||
actor_ap_url: &str,
|
||||
outbox_url: &str,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
tracing::debug!(
|
||||
actor = actor_ap_url,
|
||||
outbox = outbox_url,
|
||||
"schedule_actor_posts_fetch: deferred"
|
||||
);
|
||||
let service = self.clone();
|
||||
let actor = actor_ap_url.to_string();
|
||||
let outbox = outbox_url.to_string();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = service.backfill_outbox(&outbox, &actor).await {
|
||||
tracing::warn!(actor = %actor, error = %e, "posts backfill failed");
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1601,13 +1610,107 @@ impl domain::ports::FederationSchedulerPort for ActivityPubService {
|
||||
connection_type: &str,
|
||||
page: u32,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
tracing::debug!(
|
||||
actor = actor_ap_url,
|
||||
collection = collection_url,
|
||||
connection_type,
|
||||
page,
|
||||
"schedule_connections_fetch: deferred"
|
||||
);
|
||||
// Only trigger a full fetch on page 1 to avoid redundant network traffic.
|
||||
if page != 1 {
|
||||
return Ok(());
|
||||
}
|
||||
let service = self.clone();
|
||||
let actor = actor_ap_url.to_string();
|
||||
let collection = collection_url.to_string();
|
||||
let conn_type = connection_type.to_string();
|
||||
let connections_repo = self.connections_repo.clone();
|
||||
tokio::spawn(async move {
|
||||
let client = match reqwest::Client::builder()
|
||||
.timeout(std::time::Duration::from_secs(HTTP_FETCH_TIMEOUT_SECS))
|
||||
.build()
|
||||
{
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "connections fetch: failed to build client");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Walk the AP collection, following first/next links.
|
||||
let mut all_urls: Vec<String> = Vec::new();
|
||||
let mut current_url: Option<String> = Some(collection.clone());
|
||||
const MAX_ACTORS: usize = 500;
|
||||
|
||||
while let Some(url) = current_url.take() {
|
||||
let val: serde_json::Value = match client
|
||||
.get(&url)
|
||||
.header("Accept", "application/activity+json, application/ld+json")
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(r) => match r.json().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, url = %url, "connections: parse error");
|
||||
break;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, url = %url, "connections: HTTP error");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
// OrderedCollection root — follow its `first` page.
|
||||
if val["type"].as_str() == Some("OrderedCollection") {
|
||||
current_url = val["first"].as_str().map(|s| s.to_string());
|
||||
continue;
|
||||
}
|
||||
|
||||
// Collect actor URLs from orderedItems (string or {id: ...}).
|
||||
let empty = vec![];
|
||||
let items = val["orderedItems"].as_array().unwrap_or(&empty);
|
||||
for item in items {
|
||||
let actor_url = item
|
||||
.as_str()
|
||||
.or_else(|| item["id"].as_str())
|
||||
.unwrap_or("");
|
||||
if !actor_url.is_empty() {
|
||||
all_urls.push(actor_url.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
if all_urls.len() >= MAX_ACTORS {
|
||||
break;
|
||||
}
|
||||
current_url = val["next"].as_str().map(|s| s.to_string());
|
||||
if current_url.is_some() {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(BATCH_FETCH_SLEEP_MS))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
if all_urls.is_empty() {
|
||||
tracing::debug!(actor = %actor, connection_type = %conn_type, "connections: empty collection");
|
||||
return;
|
||||
}
|
||||
|
||||
// Resolve profiles and cache in pages of PAGE_SIZE.
|
||||
const PAGE_SIZE: usize = 20;
|
||||
for (idx, chunk) in all_urls.chunks(PAGE_SIZE).enumerate() {
|
||||
let page_num = (idx + 1) as u32;
|
||||
let chunk_urls: Vec<String> = chunk.to_vec();
|
||||
let resolved = service.resolve_actor_profiles(chunk_urls).await;
|
||||
if let Err(e) = connections_repo
|
||||
.upsert_connections(&actor, &conn_type, page_num, &resolved)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(error = %e, "connections: upsert failed");
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
actor = %actor,
|
||||
connection_type = %conn_type,
|
||||
count = all_urls.len(),
|
||||
"connections fetch complete"
|
||||
);
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,6 +86,7 @@ pub async fn build(cfg: &Config) -> Infrastructure {
|
||||
"thoughts".to_string(),
|
||||
cfg.debug,
|
||||
None,
|
||||
Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())),
|
||||
)
|
||||
.await
|
||||
.expect("Failed to build ActivityPubService"),
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use postgres::failed_event::PgFailedEventStore;
|
||||
use postgres::remote_actor_connections::PgRemoteActorConnectionRepository;
|
||||
use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -56,6 +57,7 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
|
||||
"thoughts".to_string(),
|
||||
false,
|
||||
None,
|
||||
Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())),
|
||||
)
|
||||
.await
|
||||
.expect("ActivityPubService build failed"),
|
||||
|
||||
@@ -177,6 +177,10 @@
|
||||
}
|
||||
|
||||
@layer base {
|
||||
html {
|
||||
overflow-y: scroll;
|
||||
}
|
||||
|
||||
* {
|
||||
@apply border-border outline-ring/50;
|
||||
}
|
||||
|
||||
@@ -53,14 +53,16 @@ export default function RootLayout({
|
||||
return (
|
||||
<html lang="en">
|
||||
<body className={`${frutiger.className} antialiased`}>
|
||||
<Image
|
||||
src="/bg1.avif"
|
||||
alt=""
|
||||
fill
|
||||
priority
|
||||
quality={85}
|
||||
className="fixed inset-0 -z-10 object-cover object-center"
|
||||
/>
|
||||
<div className="fixed inset-0 -z-10">
|
||||
<Image
|
||||
src="/bg1.avif"
|
||||
alt=""
|
||||
fill
|
||||
priority
|
||||
quality={85}
|
||||
className="object-cover object-center"
|
||||
/>
|
||||
</div>
|
||||
<AuthProvider>
|
||||
<Header />
|
||||
<main className="flex-1">{children}</main>
|
||||
|
||||
@@ -65,8 +65,11 @@ export default async function RemoteActorPage({
|
||||
}
|
||||
|
||||
const actor = actorResult.value;
|
||||
const posts =
|
||||
postsResult.status === "fulfilled" ? postsResult.value.items : [];
|
||||
const postsData = postsResult.status === "fulfilled" ? postsResult.value : null;
|
||||
const posts = postsData?.items ?? [];
|
||||
const totalPages = postsData
|
||||
? Math.ceil(postsData.total / postsData.per_page)
|
||||
: 1;
|
||||
const me =
|
||||
meResult.status === "fulfilled" ? (meResult.value as Me | null) : null;
|
||||
const following =
|
||||
@@ -77,7 +80,9 @@ export default async function RemoteActorPage({
|
||||
<RemoteUserProfile
|
||||
key={actor.url}
|
||||
actor={actor}
|
||||
handle={handle}
|
||||
initialPosts={posts}
|
||||
initialTotalPages={totalPages}
|
||||
me={me}
|
||||
initialFollowed={initialFollowed}
|
||||
/>
|
||||
|
||||
@@ -53,8 +53,7 @@ import { FollowButton } from "@/components/follow-button";
|
||||
import { TopFriends } from "@/components/top-friends";
|
||||
import { Suspense } from "react";
|
||||
import { ProfileSkeleton } from "@/components/loading-skeleton";
|
||||
import { buildThoughtThreads } from "@/lib/utils";
|
||||
import { ThoughtThread } from "@/components/thought-thread";
|
||||
import { UserThoughtsList } from "@/components/user-thoughts-list";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import Link from "next/link";
|
||||
import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs";
|
||||
@@ -95,9 +94,11 @@ export default async function ProfilePage({ params }: ProfilePageProps) {
|
||||
const user = userResult.value;
|
||||
const me = meResult.status === "fulfilled" ? (meResult.value as Me) : null;
|
||||
|
||||
const thoughts =
|
||||
thoughtsResult.status === "fulfilled" ? thoughtsResult.value.items : [];
|
||||
const thoughtThreads = buildThoughtThreads(thoughts);
|
||||
const thoughtsData = thoughtsResult.status === "fulfilled" ? thoughtsResult.value : null;
|
||||
const thoughts = thoughtsData?.items ?? [];
|
||||
const totalPages = thoughtsData
|
||||
? Math.ceil(thoughtsData.total / thoughtsData.per_page)
|
||||
: 1;
|
||||
|
||||
const localFollowersCount =
|
||||
followersResult.status === "fulfilled"
|
||||
@@ -194,7 +195,7 @@ export default async function ProfilePage({ params }: ProfilePageProps) {
|
||||
@{user.username}
|
||||
</p>
|
||||
{fediverseHandle && (
|
||||
<p className="text-xs text-muted-foreground/70 mt-0.5 font-mono select-all">
|
||||
<p className="text-xs text-muted-foreground/70 mt-0.5 font-mono select-all break-all">
|
||||
{fediverseHandle}
|
||||
</p>
|
||||
)}
|
||||
@@ -262,16 +263,12 @@ export default async function ProfilePage({ params }: ProfilePageProps) {
|
||||
)}
|
||||
</TabsList>
|
||||
<TabsContent value="thoughts" className="space-y-4">
|
||||
{thoughtThreads.map((thought) => (
|
||||
<ThoughtThread
|
||||
key={thought.id}
|
||||
thought={thought}
|
||||
currentUser={me}
|
||||
/>
|
||||
))}
|
||||
{thoughtThreads.length === 0 && (
|
||||
<EmptyState emoji="💭" title="Nothing here yet" message="This user hasn't posted any public thoughts yet." />
|
||||
)}
|
||||
<UserThoughtsList
|
||||
username={username}
|
||||
initialThoughts={thoughts}
|
||||
totalPages={totalPages}
|
||||
me={me}
|
||||
/>
|
||||
</TabsContent>
|
||||
{isOwnProfile && (
|
||||
<TabsContent value="federation">
|
||||
|
||||
@@ -18,6 +18,19 @@ interface RemoteUserCardProps {
|
||||
};
|
||||
}
|
||||
|
||||
function resolveProfileHref(handle: string): string {
|
||||
const apiDomain = process.env.NEXT_PUBLIC_API_URL
|
||||
? new URL(process.env.NEXT_PUBLIC_API_URL).hostname
|
||||
: null;
|
||||
const clean = handle.startsWith("@") ? handle.slice(1) : handle;
|
||||
const atIdx = clean.indexOf("@");
|
||||
const domain = atIdx !== -1 ? clean.slice(atIdx + 1) : null;
|
||||
const username = atIdx !== -1 ? clean.slice(0, atIdx) : clean;
|
||||
return apiDomain && domain === apiDomain
|
||||
? `/users/${username}`
|
||||
: `/remote-actor?handle=@${clean}`;
|
||||
}
|
||||
|
||||
export function RemoteUserCard({ actor }: RemoteUserCardProps) {
|
||||
const [followed, setFollowed] = useState(false);
|
||||
const [loading, setLoading] = useState(false);
|
||||
@@ -43,7 +56,7 @@ export function RemoteUserCard({ actor }: RemoteUserCardProps) {
|
||||
return (
|
||||
<div className="flex items-center justify-between p-4 border rounded-lg">
|
||||
<Link
|
||||
href={`/users/@${actor.handle}`}
|
||||
href={resolveProfileHref(actor.handle)}
|
||||
className="flex items-center gap-3 hover:opacity-80"
|
||||
>
|
||||
<UserAvatar src={actor.avatarUrl} alt={actor.displayName ?? actor.handle} />
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import { useState } from "react";
|
||||
import { UserMinus, UserPlus } from "lucide-react";
|
||||
import { followUser, unfollowUser, RemoteActor, Thought, Me } from "@/lib/api";
|
||||
import { followUser, unfollowUser, getRemoteActorPosts, RemoteActor, Thought, Me } from "@/lib/api";
|
||||
import { Card } from "@/components/ui/card";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs";
|
||||
@@ -14,14 +14,18 @@ import { Connections } from "./connections";
|
||||
|
||||
interface RemoteUserProfileProps {
|
||||
actor: RemoteActor;
|
||||
handle: string;
|
||||
initialPosts: Thought[];
|
||||
initialTotalPages: number;
|
||||
me: Me | null;
|
||||
initialFollowed?: boolean;
|
||||
}
|
||||
|
||||
export function RemoteUserProfile({
|
||||
actor,
|
||||
handle,
|
||||
initialPosts,
|
||||
initialTotalPages,
|
||||
me,
|
||||
initialFollowed = false,
|
||||
}: RemoteUserProfileProps) {
|
||||
@@ -29,6 +33,24 @@ export function RemoteUserProfile({
|
||||
const [followLoading, setFollowLoading] = useState(false);
|
||||
const { token } = useAuth();
|
||||
|
||||
const [posts, setPosts] = useState<Thought[]>(initialPosts);
|
||||
const [page, setPage] = useState(1);
|
||||
const [totalPages] = useState(initialTotalPages);
|
||||
const [loadingMore, setLoadingMore] = useState(false);
|
||||
|
||||
const loadMore = async () => {
|
||||
setLoadingMore(true);
|
||||
try {
|
||||
const result = await getRemoteActorPosts(handle, page + 1, token);
|
||||
setPosts((prev) => [...prev, ...result.items]);
|
||||
setPage((p) => p + 1);
|
||||
} catch {
|
||||
toast.error("Failed to load more posts.");
|
||||
} finally {
|
||||
setLoadingMore(false);
|
||||
}
|
||||
};
|
||||
|
||||
const [followersActive, setFollowersActive] = useState(false);
|
||||
const [followingActive, setFollowingActive] = useState(false);
|
||||
|
||||
@@ -108,8 +130,20 @@ export function RemoteUserProfile({
|
||||
</TabsList>
|
||||
|
||||
<TabsContent value="posts" className="space-y-4 mt-4">
|
||||
{initialPosts.length > 0 ? (
|
||||
<ThoughtList thoughts={initialPosts} currentUser={me} />
|
||||
{posts.length > 0 ? (
|
||||
<>
|
||||
<ThoughtList thoughts={posts} currentUser={me} />
|
||||
{page < totalPages && (
|
||||
<Button
|
||||
onClick={loadMore}
|
||||
disabled={loadingMore}
|
||||
variant="outline"
|
||||
className="w-full rounded-full"
|
||||
>
|
||||
{loadingMore ? "Loading…" : "Load more"}
|
||||
</Button>
|
||||
)}
|
||||
</>
|
||||
) : (
|
||||
<Card className="flex items-center justify-center h-48">
|
||||
<p className="text-center text-muted-foreground">
|
||||
|
||||
72
thoughts-frontend/components/user-thoughts-list.tsx
Normal file
72
thoughts-frontend/components/user-thoughts-list.tsx
Normal file
@@ -0,0 +1,72 @@
|
||||
"use client";
|
||||
|
||||
import { useState } from "react";
|
||||
import { getUserThoughts, Me, Thought } from "@/lib/api";
|
||||
import { ThoughtThread } from "@/components/thought-thread";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import { EmptyState } from "@/components/empty-state";
|
||||
import { buildThoughtThreads } from "@/lib/utils";
|
||||
import { toast } from "sonner";
|
||||
import { useAuth } from "@/hooks/use-auth";
|
||||
|
||||
interface UserThoughtsListProps {
|
||||
username: string;
|
||||
initialThoughts: Thought[];
|
||||
totalPages: number;
|
||||
me: Me | null;
|
||||
}
|
||||
|
||||
export function UserThoughtsList({
|
||||
username,
|
||||
initialThoughts,
|
||||
totalPages,
|
||||
me,
|
||||
}: UserThoughtsListProps) {
|
||||
const [thoughts, setThoughts] = useState<Thought[]>(initialThoughts);
|
||||
const [page, setPage] = useState(1);
|
||||
const [loadingMore, setLoadingMore] = useState(false);
|
||||
const { token } = useAuth();
|
||||
|
||||
const thoughtThreads = buildThoughtThreads(thoughts);
|
||||
|
||||
const loadMore = async () => {
|
||||
setLoadingMore(true);
|
||||
try {
|
||||
const result = await getUserThoughts(username, token, page + 1);
|
||||
setThoughts((prev) => [...prev, ...result.items]);
|
||||
setPage((p) => p + 1);
|
||||
} catch {
|
||||
toast.error("Failed to load more thoughts.");
|
||||
} finally {
|
||||
setLoadingMore(false);
|
||||
}
|
||||
};
|
||||
|
||||
if (thoughtThreads.length === 0) {
|
||||
return (
|
||||
<EmptyState
|
||||
emoji="💭"
|
||||
title="Nothing here yet"
|
||||
message="This user hasn't posted any public thoughts yet."
|
||||
/>
|
||||
);
|
||||
}
|
||||
|
||||
return (
|
||||
<div className="space-y-4">
|
||||
{thoughtThreads.map((thought) => (
|
||||
<ThoughtThread key={thought.id} thought={thought} currentUser={me} />
|
||||
))}
|
||||
{page < totalPages && (
|
||||
<Button
|
||||
onClick={loadMore}
|
||||
disabled={loadingMore}
|
||||
variant="outline"
|
||||
className="w-full rounded-full"
|
||||
>
|
||||
{loadingMore ? "Loading…" : "Load more"}
|
||||
</Button>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
@@ -343,9 +343,9 @@ export const getFeed = (token: string, page: number = 1, pageSize: number = 20)
|
||||
token
|
||||
);
|
||||
|
||||
export const getUserThoughts = (username: string, token: string | null) =>
|
||||
export const getUserThoughts = (username: string, token: string | null, page = 1) =>
|
||||
apiFetch(
|
||||
`/users/${username}/thoughts`,
|
||||
`/users/${username}/thoughts?page=${page}`,
|
||||
{ next: { tags: [`profile:${username}`] } },
|
||||
z.object({ items: z.array(ThoughtSchema), total: z.number(), page: z.number(), per_page: z.number() }),
|
||||
token
|
||||
|
||||
Reference in New Issue
Block a user