feat(activitypub): implement user outbox endpoint and federate thoughts to followers
This commit is contained in:
@@ -18,12 +18,7 @@ bcrypt = "0.17.1"
|
||||
jsonwebtoken = "9.3.1"
|
||||
once_cell = "1.21.3"
|
||||
|
||||
tower-http = { version = "0.6.6", features = ["fs", "cors"] }
|
||||
tower-cookies = "0.11.0"
|
||||
anyhow = "1.0.98"
|
||||
dotenvy = "0.15.7"
|
||||
activitypub_federation = "0.6.5"
|
||||
url = "2.5.7"
|
||||
tokio = "1.45.1"
|
||||
|
||||
# db
|
||||
sea-orm = { workspace = true }
|
||||
@@ -36,6 +31,13 @@ serde_json = { workspace = true }
|
||||
# local dependencies
|
||||
app = { path = "../app" }
|
||||
models = { path = "../models" }
|
||||
reqwest = { version = "0.12.23", features = ["json"] }
|
||||
|
||||
|
||||
tower-http = { version = "0.6.6", features = ["fs", "cors"] }
|
||||
tower-cookies = "0.11.0"
|
||||
anyhow = "1.0.98"
|
||||
dotenvy = "0.15.7"
|
||||
activitypub_federation = "0.6.5"
|
||||
url = "2.5.7"
|
||||
[dev-dependencies]
|
||||
|
71
thoughts-backend/api/src/federation.rs
Normal file
71
thoughts-backend/api/src/federation.rs
Normal file
@@ -0,0 +1,71 @@
|
||||
use app::{
|
||||
persistence::{follow, user},
|
||||
state::AppState,
|
||||
};
|
||||
use models::domains::thought;
|
||||
use serde_json::json;
|
||||
|
||||
// This function handles pushing a new thought to all followers.
|
||||
pub async fn federate_thought(
|
||||
state: AppState,
|
||||
thought: thought::Model,
|
||||
author: models::domains::user::Model,
|
||||
) {
|
||||
// Find all followers of the author
|
||||
let follower_ids = match follow::get_follower_ids(&state.conn, author.id).await {
|
||||
Ok(ids) => ids,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to get followers for federation: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if follower_ids.is_empty() {
|
||||
tracing::debug!("No followers to federate to for user {}", author.username);
|
||||
return;
|
||||
}
|
||||
|
||||
let base_url = "http://localhost:3000"; // Replace in production
|
||||
let thought_url = format!("{}/thoughts/{}", base_url, thought.id);
|
||||
let author_url = format!("{}/users/{}", base_url, author.username);
|
||||
|
||||
// Construct the "Create" activity containing the "Note" object
|
||||
let activity = json!({
|
||||
"@context": "https://www.w3.org/ns/activitystreams",
|
||||
"id": format!("{}/activity", thought_url),
|
||||
"type": "Create",
|
||||
"actor": author_url,
|
||||
"object": {
|
||||
"id": thought_url,
|
||||
"type": "Note",
|
||||
"attributedTo": author_url,
|
||||
"content": thought.content,
|
||||
"published": thought.created_at.to_rfc3339(),
|
||||
"to": ["https://www.w3.org/ns/activitystreams#Public"],
|
||||
"cc": [format!("{}/followers", author_url)]
|
||||
}
|
||||
});
|
||||
|
||||
// Get the inbox URLs for all followers
|
||||
// In a real federated app, you would store remote users' full inbox URLs.
|
||||
// For now, we assume followers are local and construct their inbox URLs.
|
||||
let followers = match user::get_users_by_ids(&state.conn, follower_ids).await {
|
||||
Ok(users) => users,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to get follower user objects: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
for follower in followers {
|
||||
let inbox_url = format!("{}/users/{}/inbox", base_url, follower.username);
|
||||
tracing::info!("Federating post {} to {}", thought.id, inbox_url);
|
||||
|
||||
let res = client.post(&inbox_url).json(&activity).send().await;
|
||||
|
||||
if let Err(e) = res {
|
||||
tracing::error!("Failed to federate to {}: {}", inbox_url, e);
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,5 +1,6 @@
|
||||
mod error;
|
||||
mod extractor;
|
||||
mod federation;
|
||||
mod init;
|
||||
mod validation;
|
||||
|
||||
|
@@ -16,6 +16,7 @@ use models::{params::thought::CreateThoughtParams, schemas::thought::ThoughtSche
|
||||
use crate::{
|
||||
error::ApiError,
|
||||
extractor::{AuthUser, Json, Valid},
|
||||
federation,
|
||||
models::{ApiErrorResponse, ParamsErrorResponse},
|
||||
};
|
||||
|
||||
@@ -43,6 +44,13 @@ async fn thoughts_post(
|
||||
.await?
|
||||
.ok_or(UserError::NotFound)?; // Should not happen if auth is valid
|
||||
|
||||
// Spawn a background task to handle federation without blocking the response
|
||||
tokio::spawn(federation::federate_thought(
|
||||
state.clone(),
|
||||
thought.clone(),
|
||||
author.clone(),
|
||||
));
|
||||
|
||||
let schema = ThoughtSchema::from_models(&thought, &author);
|
||||
Ok((StatusCode::CREATED, Json(schema)))
|
||||
}
|
||||
|
@@ -252,6 +252,67 @@ async fn get_user_by_param(
|
||||
}
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/{username}/outbox",
|
||||
description = "The ActivityPub outbox for sending activities.",
|
||||
responses(
|
||||
(status = 200, description = "Activity collection", body = Object),
|
||||
(status = 404, description = "User not found")
|
||||
)
|
||||
)]
|
||||
async fn user_outbox_get(
|
||||
State(state): State<AppState>,
|
||||
Path(username): Path<String>,
|
||||
) -> Result<impl IntoResponse, ApiError> {
|
||||
let user = get_user_by_username(&state.conn, &username)
|
||||
.await?
|
||||
.ok_or(UserError::NotFound)?;
|
||||
|
||||
let thoughts = get_thoughts_by_user(&state.conn, user.id).await?;
|
||||
|
||||
// Format the outbox as an ActivityPub OrderedCollection
|
||||
let base_url = "http://localhost:3000";
|
||||
let outbox_url = format!("{}/users/{}/outbox", base_url, username);
|
||||
let items: Vec<Value> = thoughts
|
||||
.into_iter()
|
||||
.map(|thought| {
|
||||
let thought_url = format!("{}/thoughts/{}", base_url, thought.id);
|
||||
let author_url = format!("{}/users/{}", base_url, thought.author_username);
|
||||
json!({
|
||||
"id": format!("{}/activity", thought_url),
|
||||
"type": "Create",
|
||||
"actor": author_url,
|
||||
"published": thought.created_at,
|
||||
"to": ["https://www.w3.org/ns/activitystreams#Public"],
|
||||
"object": {
|
||||
"id": thought_url,
|
||||
"type": "Note",
|
||||
"attributedTo": author_url,
|
||||
"content": thought.content,
|
||||
"published": thought.created_at,
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
let outbox = json!({
|
||||
"@context": "https://www.w3.org/ns/activitystreams",
|
||||
"id": outbox_url,
|
||||
"type": "OrderedCollection",
|
||||
"totalItems": items.len(),
|
||||
"orderedItems": items,
|
||||
});
|
||||
|
||||
let mut headers = axum::http::HeaderMap::new();
|
||||
headers.insert(
|
||||
axum::http::header::CONTENT_TYPE,
|
||||
"application/activity+json".parse().unwrap(),
|
||||
);
|
||||
|
||||
Ok((headers, Json(outbox)))
|
||||
}
|
||||
|
||||
pub fn create_user_router() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/", get(users_get))
|
||||
@@ -262,4 +323,5 @@ pub fn create_user_router() -> Router<AppState> {
|
||||
post(user_follow_post).delete(user_follow_delete),
|
||||
)
|
||||
.route("/{username}/inbox", post(user_inbox_post))
|
||||
.route("/{username}/outbox", get(user_outbox_get))
|
||||
}
|
||||
|
Reference in New Issue
Block a user