use std::sync::Arc;
use domain::ports::FederationFetchPort;
use activitypub_federation::{
activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext,
traits::Actor,
};
use axum::{Router, routing::get, routing::post};
use url::Url;
use crate::{
activities::{
AcceptActivity, CreateActivity, FollowActivity, RejectActivity, UndoActivity,
UpdateActivity,
},
actors::{DbActor, get_local_actor},
content::ApObjectHandler,
data::FederationData,
federation::ApFederationConfig,
inbox::inbox_handler,
nodeinfo::{nodeinfo_handler, nodeinfo_well_known_handler},
outbox::outbox_handler,
repository::{
BlockedDomain, FederationRepository, FollowerStatus, FollowingStatus, RemoteActor,
},
urls::activity_url,
user::ApUserRepository,
webfinger::webfinger_handler,
};
const DELIVERY_MAX_ATTEMPTS: u32 = 3;
const DELIVERY_INITIAL_DELAY_SECS: u64 = 1;
const HTTP_FETCH_TIMEOUT_SECS: u64 = 30;
const BATCH_FETCH_SLEEP_MS: u64 = 100;
fn content_to_html(text: &str) -> String {
let escaped = text
.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
.replace('"', """);
let paragraphs: Vec<&str> = escaped.split('\n').filter(|s| !s.is_empty()).collect();
if paragraphs.is_empty() {
format!("
{}
", escaped)
} else {
paragraphs
.iter()
.map(|p| format!("{}
", p))
.collect::>()
.join("")
}
}
fn thought_note_json(
thought: &domain::models::thought::Thought,
local_actor: &crate::actors::DbActor,
base_url: &str,
in_reply_to_url: Option<&str>,
) -> anyhow::Result<(url::Url, serde_json::Value)> {
let ap_id = url::Url::parse(&format!("{}/thoughts/{}", base_url, thought.id))?;
// Build to/cc based on visibility per AP spec.
let (to, cc) = match thought.visibility {
domain::models::thought::Visibility::Public => (
vec![crate::urls::AS_PUBLIC.to_string()],
vec![local_actor.followers_url.to_string()],
),
domain::models::thought::Visibility::Unlisted => (
vec![local_actor.followers_url.to_string()],
vec![crate::urls::AS_PUBLIC.to_string()],
),
domain::models::thought::Visibility::Followers => {
(vec![local_actor.followers_url.to_string()], vec![])
}
domain::models::thought::Visibility::Direct => (vec![], vec![]),
};
let mut note = serde_json::json!({
"type": "Note",
"id": ap_id.to_string(),
"url": ap_id.to_string(),
"attributedTo": local_actor.ap_id.to_string(),
"content": content_to_html(thought.content.as_str()),
"published": thought.created_at.to_rfc3339(),
"to": to,
"cc": cc,
"sensitive": thought.sensitive,
});
if let Some(ref cw) = thought.content_warning {
note["summary"] = serde_json::json!(cw);
}
if let Some(reply_url) = in_reply_to_url {
note["inReplyTo"] = serde_json::json!(reply_url);
}
if let Some(updated_at) = thought.updated_at {
note["updated"] = serde_json::json!(updated_at.to_rfc3339());
}
let hashtags = domain::hashtag::extract(thought.content.as_str());
if !hashtags.is_empty() {
let ap_tags: Vec = hashtags
.iter()
.map(|h| {
serde_json::json!({
"type": "Hashtag",
"name": h.ap_name,
"href": format!("{}/{}", base_url, h.url_slug),
})
})
.collect();
note["tag"] = serde_json::json!(ap_tags);
}
Ok((ap_id, note))
}
fn collect_inboxes(followers: &[crate::repository::Follower]) -> Vec {
let mut seen = std::collections::HashSet::new();
let mut inboxes = Vec::new();
for f in followers {
let inbox_str = f
.actor
.shared_inbox_url
.as_deref()
.unwrap_or(&f.actor.inbox_url);
if seen.insert(inbox_str.to_string())
&& let Ok(url) = Url::parse(inbox_str)
{
inboxes.push(url);
}
}
inboxes
}
pub(crate) async fn send_with_retry(
sends: Vec,
data: &activitypub_federation::config::Data,
) -> Vec {
let mut failures = vec![];
for send in sends {
let mut delay = std::time::Duration::from_secs(DELIVERY_INITIAL_DELAY_SECS);
for attempt in 1..=DELIVERY_MAX_ATTEMPTS {
match send.clone().sign_and_send(data).await {
Ok(()) => break,
Err(e) if attempt < DELIVERY_MAX_ATTEMPTS => {
tracing::warn!(attempt, error = %e, "delivery failed, retrying");
tokio::time::sleep(delay).await;
delay *= 2;
}
Err(e) => {
tracing::error!(attempt, error = %e, "delivery failed permanently");
failures.push(anyhow::anyhow!(e));
}
}
}
}
failures
}
#[derive(Clone)]
pub struct ActivityPubService {
federation_config: ApFederationConfig,
base_url: String,
connections_repo: Arc,
}
pub struct ActivityPubServiceBuilder {
repo: Arc,
user_repo: Arc,
object_handler: Arc,
base_url: String,
connections_repo: Arc,
allow_registration: bool,
software_name: String,
debug: bool,
event_publisher: Option>,
}
impl ActivityPubServiceBuilder {
pub fn allow_registration(mut self, v: bool) -> Self {
self.allow_registration = v;
self
}
pub fn software_name(mut self, v: impl Into) -> Self {
self.software_name = v.into();
self
}
pub fn debug(mut self, v: bool) -> Self {
self.debug = v;
self
}
pub fn event_publisher(mut self, v: Arc) -> Self {
self.event_publisher = Some(v);
self
}
pub async fn build(self) -> anyhow::Result {
let data = FederationData::new(
self.repo,
self.user_repo,
self.object_handler,
self.base_url.clone(),
self.allow_registration,
self.software_name,
self.event_publisher,
);
let federation_config = ApFederationConfig::new(data, self.debug).await?;
Ok(ActivityPubService {
federation_config,
base_url: self.base_url,
connections_repo: self.connections_repo,
})
}
}
impl ActivityPubService {
pub fn builder(
repo: Arc,
user_repo: Arc,
object_handler: Arc,
base_url: impl Into,
connections_repo: Arc,
) -> ActivityPubServiceBuilder {
ActivityPubServiceBuilder {
repo,
user_repo,
object_handler,
base_url: base_url.into(),
connections_repo,
allow_registration: false,
software_name: String::new(),
debug: false,
event_publisher: None,
}
}
pub fn federation_config(&self) -> &ApFederationConfig {
&self.federation_config
}
pub fn request_data(&self) -> activitypub_federation::config::Data {
self.federation_config.to_request_data()
}
/// Returns `(local_actor, deduplicated_inboxes)` for all accepted followers,
/// excluding blocked actors and blocked domains.
/// Returns `None` if there are no eligible followers.
async fn accepted_follower_inboxes(
&self,
data: &activitypub_federation::config::Data,
local_user_id: uuid::Uuid,
) -> anyhow::Result