Compare commits

...

8 Commits

Author SHA1 Message Date
b58c96b843 feat: implement federation post/connections backfill schedulers
Some checks failed
lint / lint (push) Failing after 5m12s
test / integration (push) Has been cancelled
test / unit (push) Has been cancelled
schedule_actor_posts_fetch now spawns backfill_outbox in background,
fetching all pages of a remote outbox and persisting via accept_note.
schedule_connections_fetch follows AP collection next-links, resolves
profiles, and caches them in the DB. Both were no-ops ("deferred").

Add connections_repo field to ActivityPubService; wire both factories.
2026-05-17 11:49:53 +02:00
8ea24461ba feat: load more pagination for user profile thoughts 2026-05-16 15:21:18 +02:00
e14a9f90c8 fix: route local users to /users/{username} in remote connection lists 2026-05-16 15:17:58 +02:00
28756ef4cd feat: load more pagination for remote user posts 2026-05-16 15:14:53 +02:00
7f27ae49c3 fix: overflow-y scroll on html to prevent layout shift on dropdown open 2026-05-16 15:12:41 +02:00
59f3423c00 fix: break-all on fediverse handle to prevent overflow 2026-05-16 15:07:30 +02:00
c48aa33592 fix: scrollbar-gutter stable to prevent bg flicker on dropdown open 2026-05-16 15:05:28 +02:00
8f3aa4b891 fix: wrap background image in fixed div so it stays put on scroll 2026-05-16 15:03:41 +02:00
11 changed files with 277 additions and 44 deletions

View File

@@ -1,5 +1,7 @@
use std::sync::Arc;
use domain::ports::FederationFetchPort;
use activitypub_federation::{
activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext,
traits::Actor,
@@ -154,9 +156,11 @@ pub(crate) async fn send_with_retry(
failures
}
#[derive(Clone)]
pub struct ActivityPubService {
federation_config: ApFederationConfig,
base_url: String,
connections_repo: Arc<dyn domain::ports::RemoteActorConnectionRepository>,
}
impl ActivityPubService {
@@ -170,6 +174,7 @@ impl ActivityPubService {
software_name: String,
debug: bool,
event_publisher: Option<Arc<dyn domain::ports::EventPublisher>>,
connections_repo: Arc<dyn domain::ports::RemoteActorConnectionRepository>,
) -> anyhow::Result<Self> {
let data = FederationData::new(
repo,
@@ -184,6 +189,7 @@ impl ActivityPubService {
Ok(Self {
federation_config,
base_url,
connections_repo,
})
}
@@ -1586,11 +1592,14 @@ impl domain::ports::FederationSchedulerPort for ActivityPubService {
actor_ap_url: &str,
outbox_url: &str,
) -> Result<(), domain::errors::DomainError> {
tracing::debug!(
actor = actor_ap_url,
outbox = outbox_url,
"schedule_actor_posts_fetch: deferred"
);
let service = self.clone();
let actor = actor_ap_url.to_string();
let outbox = outbox_url.to_string();
tokio::spawn(async move {
if let Err(e) = service.backfill_outbox(&outbox, &actor).await {
tracing::warn!(actor = %actor, error = %e, "posts backfill failed");
}
});
Ok(())
}
@@ -1601,13 +1610,107 @@ impl domain::ports::FederationSchedulerPort for ActivityPubService {
connection_type: &str,
page: u32,
) -> Result<(), domain::errors::DomainError> {
tracing::debug!(
actor = actor_ap_url,
collection = collection_url,
connection_type,
page,
"schedule_connections_fetch: deferred"
);
// Only trigger a full fetch on page 1 to avoid redundant network traffic.
if page != 1 {
return Ok(());
}
let service = self.clone();
let actor = actor_ap_url.to_string();
let collection = collection_url.to_string();
let conn_type = connection_type.to_string();
let connections_repo = self.connections_repo.clone();
tokio::spawn(async move {
let client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(HTTP_FETCH_TIMEOUT_SECS))
.build()
{
Ok(c) => c,
Err(e) => {
tracing::warn!(error = %e, "connections fetch: failed to build client");
return;
}
};
// Walk the AP collection, following first/next links.
let mut all_urls: Vec<String> = Vec::new();
let mut current_url: Option<String> = Some(collection.clone());
const MAX_ACTORS: usize = 500;
while let Some(url) = current_url.take() {
let val: serde_json::Value = match client
.get(&url)
.header("Accept", "application/activity+json, application/ld+json")
.send()
.await
{
Ok(r) => match r.json().await {
Ok(v) => v,
Err(e) => {
tracing::warn!(error = %e, url = %url, "connections: parse error");
break;
}
},
Err(e) => {
tracing::warn!(error = %e, url = %url, "connections: HTTP error");
break;
}
};
// OrderedCollection root — follow its `first` page.
if val["type"].as_str() == Some("OrderedCollection") {
current_url = val["first"].as_str().map(|s| s.to_string());
continue;
}
// Collect actor URLs from orderedItems (string or {id: ...}).
let empty = vec![];
let items = val["orderedItems"].as_array().unwrap_or(&empty);
for item in items {
let actor_url = item
.as_str()
.or_else(|| item["id"].as_str())
.unwrap_or("");
if !actor_url.is_empty() {
all_urls.push(actor_url.to_string());
}
}
if all_urls.len() >= MAX_ACTORS {
break;
}
current_url = val["next"].as_str().map(|s| s.to_string());
if current_url.is_some() {
tokio::time::sleep(std::time::Duration::from_millis(BATCH_FETCH_SLEEP_MS))
.await;
}
}
if all_urls.is_empty() {
tracing::debug!(actor = %actor, connection_type = %conn_type, "connections: empty collection");
return;
}
// Resolve profiles and cache in pages of PAGE_SIZE.
const PAGE_SIZE: usize = 20;
for (idx, chunk) in all_urls.chunks(PAGE_SIZE).enumerate() {
let page_num = (idx + 1) as u32;
let chunk_urls: Vec<String> = chunk.to_vec();
let resolved = service.resolve_actor_profiles(chunk_urls).await;
if let Err(e) = connections_repo
.upsert_connections(&actor, &conn_type, page_num, &resolved)
.await
{
tracing::warn!(error = %e, "connections: upsert failed");
}
}
tracing::debug!(
actor = %actor,
connection_type = %conn_type,
count = all_urls.len(),
"connections fetch complete"
);
});
Ok(())
}
}

View File

@@ -86,6 +86,7 @@ pub async fn build(cfg: &Config) -> Infrastructure {
"thoughts".to_string(),
cfg.debug,
None,
Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())),
)
.await
.expect("Failed to build ActivityPubService"),

View File

@@ -1,4 +1,5 @@
use postgres::failed_event::PgFailedEventStore;
use postgres::remote_actor_connections::PgRemoteActorConnectionRepository;
use sqlx::PgPool;
use std::sync::Arc;
@@ -56,6 +57,7 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
"thoughts".to_string(),
false,
None,
Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())),
)
.await
.expect("ActivityPubService build failed"),

View File

@@ -177,6 +177,10 @@
}
@layer base {
html {
overflow-y: scroll;
}
* {
@apply border-border outline-ring/50;
}

View File

@@ -53,14 +53,16 @@ export default function RootLayout({
return (
<html lang="en">
<body className={`${frutiger.className} antialiased`}>
<Image
src="/bg1.avif"
alt=""
fill
priority
quality={85}
className="fixed inset-0 -z-10 object-cover object-center"
/>
<div className="fixed inset-0 -z-10">
<Image
src="/bg1.avif"
alt=""
fill
priority
quality={85}
className="object-cover object-center"
/>
</div>
<AuthProvider>
<Header />
<main className="flex-1">{children}</main>

View File

@@ -65,8 +65,11 @@ export default async function RemoteActorPage({
}
const actor = actorResult.value;
const posts =
postsResult.status === "fulfilled" ? postsResult.value.items : [];
const postsData = postsResult.status === "fulfilled" ? postsResult.value : null;
const posts = postsData?.items ?? [];
const totalPages = postsData
? Math.ceil(postsData.total / postsData.per_page)
: 1;
const me =
meResult.status === "fulfilled" ? (meResult.value as Me | null) : null;
const following =
@@ -77,7 +80,9 @@ export default async function RemoteActorPage({
<RemoteUserProfile
key={actor.url}
actor={actor}
handle={handle}
initialPosts={posts}
initialTotalPages={totalPages}
me={me}
initialFollowed={initialFollowed}
/>

View File

@@ -53,8 +53,7 @@ import { FollowButton } from "@/components/follow-button";
import { TopFriends } from "@/components/top-friends";
import { Suspense } from "react";
import { ProfileSkeleton } from "@/components/loading-skeleton";
import { buildThoughtThreads } from "@/lib/utils";
import { ThoughtThread } from "@/components/thought-thread";
import { UserThoughtsList } from "@/components/user-thoughts-list";
import { Button } from "@/components/ui/button";
import Link from "next/link";
import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs";
@@ -95,9 +94,11 @@ export default async function ProfilePage({ params }: ProfilePageProps) {
const user = userResult.value;
const me = meResult.status === "fulfilled" ? (meResult.value as Me) : null;
const thoughts =
thoughtsResult.status === "fulfilled" ? thoughtsResult.value.items : [];
const thoughtThreads = buildThoughtThreads(thoughts);
const thoughtsData = thoughtsResult.status === "fulfilled" ? thoughtsResult.value : null;
const thoughts = thoughtsData?.items ?? [];
const totalPages = thoughtsData
? Math.ceil(thoughtsData.total / thoughtsData.per_page)
: 1;
const localFollowersCount =
followersResult.status === "fulfilled"
@@ -194,7 +195,7 @@ export default async function ProfilePage({ params }: ProfilePageProps) {
@{user.username}
</p>
{fediverseHandle && (
<p className="text-xs text-muted-foreground/70 mt-0.5 font-mono select-all">
<p className="text-xs text-muted-foreground/70 mt-0.5 font-mono select-all break-all">
{fediverseHandle}
</p>
)}
@@ -262,16 +263,12 @@ export default async function ProfilePage({ params }: ProfilePageProps) {
)}
</TabsList>
<TabsContent value="thoughts" className="space-y-4">
{thoughtThreads.map((thought) => (
<ThoughtThread
key={thought.id}
thought={thought}
currentUser={me}
/>
))}
{thoughtThreads.length === 0 && (
<EmptyState emoji="💭" title="Nothing here yet" message="This user hasn't posted any public thoughts yet." />
)}
<UserThoughtsList
username={username}
initialThoughts={thoughts}
totalPages={totalPages}
me={me}
/>
</TabsContent>
{isOwnProfile && (
<TabsContent value="federation">

View File

@@ -18,6 +18,19 @@ interface RemoteUserCardProps {
};
}
function resolveProfileHref(handle: string): string {
const apiDomain = process.env.NEXT_PUBLIC_API_URL
? new URL(process.env.NEXT_PUBLIC_API_URL).hostname
: null;
const clean = handle.startsWith("@") ? handle.slice(1) : handle;
const atIdx = clean.indexOf("@");
const domain = atIdx !== -1 ? clean.slice(atIdx + 1) : null;
const username = atIdx !== -1 ? clean.slice(0, atIdx) : clean;
return apiDomain && domain === apiDomain
? `/users/${username}`
: `/remote-actor?handle=@${clean}`;
}
export function RemoteUserCard({ actor }: RemoteUserCardProps) {
const [followed, setFollowed] = useState(false);
const [loading, setLoading] = useState(false);
@@ -43,7 +56,7 @@ export function RemoteUserCard({ actor }: RemoteUserCardProps) {
return (
<div className="flex items-center justify-between p-4 border rounded-lg">
<Link
href={`/users/@${actor.handle}`}
href={resolveProfileHref(actor.handle)}
className="flex items-center gap-3 hover:opacity-80"
>
<UserAvatar src={actor.avatarUrl} alt={actor.displayName ?? actor.handle} />

View File

@@ -2,7 +2,7 @@
import { useState } from "react";
import { UserMinus, UserPlus } from "lucide-react";
import { followUser, unfollowUser, RemoteActor, Thought, Me } from "@/lib/api";
import { followUser, unfollowUser, getRemoteActorPosts, RemoteActor, Thought, Me } from "@/lib/api";
import { Card } from "@/components/ui/card";
import { Button } from "@/components/ui/button";
import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs";
@@ -14,14 +14,18 @@ import { Connections } from "./connections";
interface RemoteUserProfileProps {
actor: RemoteActor;
handle: string;
initialPosts: Thought[];
initialTotalPages: number;
me: Me | null;
initialFollowed?: boolean;
}
export function RemoteUserProfile({
actor,
handle,
initialPosts,
initialTotalPages,
me,
initialFollowed = false,
}: RemoteUserProfileProps) {
@@ -29,6 +33,24 @@ export function RemoteUserProfile({
const [followLoading, setFollowLoading] = useState(false);
const { token } = useAuth();
const [posts, setPosts] = useState<Thought[]>(initialPosts);
const [page, setPage] = useState(1);
const [totalPages] = useState(initialTotalPages);
const [loadingMore, setLoadingMore] = useState(false);
const loadMore = async () => {
setLoadingMore(true);
try {
const result = await getRemoteActorPosts(handle, page + 1, token);
setPosts((prev) => [...prev, ...result.items]);
setPage((p) => p + 1);
} catch {
toast.error("Failed to load more posts.");
} finally {
setLoadingMore(false);
}
};
const [followersActive, setFollowersActive] = useState(false);
const [followingActive, setFollowingActive] = useState(false);
@@ -108,8 +130,20 @@ export function RemoteUserProfile({
</TabsList>
<TabsContent value="posts" className="space-y-4 mt-4">
{initialPosts.length > 0 ? (
<ThoughtList thoughts={initialPosts} currentUser={me} />
{posts.length > 0 ? (
<>
<ThoughtList thoughts={posts} currentUser={me} />
{page < totalPages && (
<Button
onClick={loadMore}
disabled={loadingMore}
variant="outline"
className="w-full rounded-full"
>
{loadingMore ? "Loading…" : "Load more"}
</Button>
)}
</>
) : (
<Card className="flex items-center justify-center h-48">
<p className="text-center text-muted-foreground">

View File

@@ -0,0 +1,72 @@
"use client";
import { useState } from "react";
import { getUserThoughts, Me, Thought } from "@/lib/api";
import { ThoughtThread } from "@/components/thought-thread";
import { Button } from "@/components/ui/button";
import { EmptyState } from "@/components/empty-state";
import { buildThoughtThreads } from "@/lib/utils";
import { toast } from "sonner";
import { useAuth } from "@/hooks/use-auth";
interface UserThoughtsListProps {
username: string;
initialThoughts: Thought[];
totalPages: number;
me: Me | null;
}
export function UserThoughtsList({
username,
initialThoughts,
totalPages,
me,
}: UserThoughtsListProps) {
const [thoughts, setThoughts] = useState<Thought[]>(initialThoughts);
const [page, setPage] = useState(1);
const [loadingMore, setLoadingMore] = useState(false);
const { token } = useAuth();
const thoughtThreads = buildThoughtThreads(thoughts);
const loadMore = async () => {
setLoadingMore(true);
try {
const result = await getUserThoughts(username, token, page + 1);
setThoughts((prev) => [...prev, ...result.items]);
setPage((p) => p + 1);
} catch {
toast.error("Failed to load more thoughts.");
} finally {
setLoadingMore(false);
}
};
if (thoughtThreads.length === 0) {
return (
<EmptyState
emoji="💭"
title="Nothing here yet"
message="This user hasn't posted any public thoughts yet."
/>
);
}
return (
<div className="space-y-4">
{thoughtThreads.map((thought) => (
<ThoughtThread key={thought.id} thought={thought} currentUser={me} />
))}
{page < totalPages && (
<Button
onClick={loadMore}
disabled={loadingMore}
variant="outline"
className="w-full rounded-full"
>
{loadingMore ? "Loading…" : "Load more"}
</Button>
)}
</div>
);
}

View File

@@ -343,9 +343,9 @@ export const getFeed = (token: string, page: number = 1, pageSize: number = 20)
token
);
export const getUserThoughts = (username: string, token: string | null) =>
export const getUserThoughts = (username: string, token: string | null, page = 1) =>
apiFetch(
`/users/${username}/thoughts`,
`/users/${username}/thoughts?page=${page}`,
{ next: { tags: [`profile:${username}`] } },
z.object({ items: z.array(ThoughtSchema), total: z.number(), page: z.number(), per_page: z.number() }),
token