Refactor database error handling across repositories to use IntoDbResult for improved error management
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 9m30s
test / unit (pull_request) Successful in 16m10s
test / integration (pull_request) Failing after 16m44s

- Updated PgNotificationRepository to utilize IntoDbResult for error handling in various methods.
- Refactored PgRemoteActorRepository to replace manual error mapping with IntoDbResult.
- Modified PgRemoteActorConnectionRepository to implement IntoDbResult for error handling.
- Adjusted PgTagRepository to use IntoDbResult for consistent error management.
- Introduced test_helpers module for seeding users and thoughts in tests.
- Enhanced PgThoughtRepository to leverage IntoDbResult for error handling.
- Updated PgTopFriendRepository to utilize IntoDbResult for error management.
- Refactored PgUserRepository to implement IntoDbResult for error handling.
- Added constants for pagination defaults in requests.
- Introduced MAX_TOP_FRIENDS constant for top friends validation.
- Refactored JWT expiration time to use a constant.
- Improved rate limiter configuration with constants for better readability.
- Added utility methods for FollowState and Visibility enums for string conversions.
- Introduced maximum length constants for Username, Email, and Content value objects.
- Cleaned up test modules by removing redundant code and utilizing a shared testing state.
This commit is contained in:
2026-05-15 12:31:25 +02:00
parent a040a38036
commit 314dad5451
40 changed files with 456 additions and 690 deletions

View File

@@ -91,6 +91,29 @@ pub struct Person {
attachment: Vec<ProfileFieldObject>,
}
struct ActorUrls {
ap_id: Url,
inbox_url: Url,
shared_inbox_url: Option<Url>,
outbox_url: Url,
followers_url: Url,
following_url: Url,
}
impl ActorUrls {
fn build(base_url: &str, user_id: uuid::Uuid) -> Self {
let ap_id = crate::urls::actor_url(base_url, user_id);
Self {
inbox_url: Url::parse(&format!("{}/inbox", &ap_id)).expect("valid url"),
shared_inbox_url: Url::parse(&format!("{}/inbox", base_url)).ok(),
outbox_url: Url::parse(&format!("{}/outbox", &ap_id)).expect("valid url"),
followers_url: Url::parse(&format!("{}/followers", &ap_id)).expect("valid url"),
following_url: Url::parse(&format!("{}/following", &ap_id)).expect("valid url"),
ap_id,
}
}
}
pub async fn get_local_actor(
user_id: uuid::Uuid,
data: &Data<FederationData>,
@@ -117,12 +140,14 @@ pub async fn get_local_actor(
}
};
let ap_id = crate::urls::actor_url(&data.base_url, user_id);
let inbox_url = Url::parse(&format!("{}/inbox", &ap_id)).expect("valid inbox url");
let shared_inbox_url = Url::parse(&format!("{}/inbox", data.base_url)).ok();
let outbox_url = Url::parse(&format!("{}/outbox", &ap_id)).expect("valid outbox url");
let followers_url = Url::parse(&format!("{}/followers", &ap_id)).expect("valid followers url");
let following_url = Url::parse(&format!("{}/following", &ap_id)).expect("valid following url");
let ActorUrls {
ap_id,
inbox_url,
shared_inbox_url,
outbox_url,
followers_url,
following_url,
} = ActorUrls::build(&data.base_url, user_id);
Ok(DbActor {
user_id,
@@ -182,12 +207,14 @@ impl Object for DbActor {
None => return Ok(None),
};
let ap_id = crate::urls::actor_url(&data.base_url, user_id);
let inbox_url = Url::parse(&format!("{}/inbox", &ap_id)).expect("valid url");
let shared_inbox_url = Url::parse(&format!("{}/inbox", data.base_url)).ok();
let outbox_url = Url::parse(&format!("{}/outbox", &ap_id)).expect("valid url");
let followers_url = Url::parse(&format!("{}/followers", &ap_id)).expect("valid url");
let following_url = Url::parse(&format!("{}/following", &ap_id)).expect("valid url");
let ActorUrls {
ap_id,
inbox_url,
shared_inbox_url,
outbox_url,
followers_url,
following_url,
} = ActorUrls::build(&data.base_url, user_id);
Ok(Some(DbActor {
user_id,

View File

@@ -5,70 +5,95 @@ use serde_json::json;
use crate::data::FederationData;
use crate::error::Error;
const PAGE_SIZE: usize = 20;
use crate::urls::AP_PAGE_SIZE;
#[derive(Deserialize)]
pub struct PageQuery {
page: Option<u32>,
}
async fn collection_handler(
user_id_str: &str,
query: PageQuery,
data: Data<FederationData>,
collection_type: &str,
) -> Result<FederationJson<serde_json::Value>, Error> {
let user_id = uuid::Uuid::parse_str(user_id_str)
.map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?;
data.user_repo
.find_by_id(user_id)
.await
.map_err(Error::from)?
.ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?;
let collection_id = format!(
"{}/users/{}/{}",
data.base_url, user_id_str, collection_type
);
let total = match collection_type {
"followers" => data.federation_repo.count_followers(user_id).await,
_ => data.federation_repo.count_following(user_id).await,
}
.map_err(Error::from)?;
if let Some(page) = query.page {
let page = page.max(1);
let offset = (page.saturating_sub(1) as usize) * AP_PAGE_SIZE;
let items: Vec<String> = match collection_type {
"followers" => data
.federation_repo
.get_followers_page(user_id, offset as u32, AP_PAGE_SIZE)
.await
.map_err(Error::from)?
.into_iter()
.map(|f| f.actor.url)
.collect(),
_ => data
.federation_repo
.get_following_page(user_id, offset as u32, AP_PAGE_SIZE)
.await
.map_err(Error::from)?
.into_iter()
.map(|a| a.url)
.collect(),
};
let has_next = offset + items.len() < total;
let mut obj = json!({
"@context": crate::urls::AP_CONTEXT,
"type": "OrderedCollectionPage",
"id": format!("{}?page={}", collection_id, page),
"partOf": collection_id,
"totalItems": total,
"orderedItems": items,
});
if has_next {
obj["next"] = json!(format!("{}?page={}", collection_id, page + 1));
}
Ok(FederationJson(obj))
} else {
Ok(FederationJson(json!({
"@context": crate::urls::AP_CONTEXT,
"type": "OrderedCollection",
"id": collection_id,
"totalItems": total,
"first": format!("{}?page=1", collection_id),
})))
}
}
pub async fn followers_handler(
Path(user_id_str): Path<String>,
Query(query): Query<PageQuery>,
data: Data<FederationData>,
) -> Result<FederationJson<serde_json::Value>, Error> {
let user_id = uuid::Uuid::parse_str(&user_id_str)
.map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?;
data.user_repo
.find_by_id(user_id)
.await
.map_err(Error::from)?
.ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?;
let collection_id = format!("{}/users/{}/followers", data.base_url, user_id_str);
let total = data
.federation_repo
.count_followers(user_id)
.await
.map_err(Error::from)?;
if let Some(page) = query.page {
let page = page.max(1);
let offset = (page.saturating_sub(1) as usize) * PAGE_SIZE;
let followers = data
.federation_repo
.get_followers_page(user_id, offset as u32, PAGE_SIZE)
.await
.map_err(Error::from)?;
let has_next = offset + followers.len() < total;
let items: Vec<String> = followers.into_iter().map(|f| f.actor.url).collect();
let mut obj = json!({
"@context": "https://www.w3.org/ns/activitystreams",
"type": "OrderedCollectionPage",
"id": format!("{}?page={}", collection_id, page),
"partOf": collection_id,
"totalItems": total,
"orderedItems": items,
});
if has_next {
obj["next"] = json!(format!("{}?page={}", collection_id, page + 1));
}
Ok(FederationJson(obj))
} else {
Ok(FederationJson(json!({
"@context": "https://www.w3.org/ns/activitystreams",
"type": "OrderedCollection",
"id": collection_id,
"totalItems": total,
"first": format!("{}?page=1", collection_id),
})))
}
collection_handler(&user_id_str, query, data, "followers").await
}
pub async fn following_handler(
@@ -76,55 +101,5 @@ pub async fn following_handler(
Query(query): Query<PageQuery>,
data: Data<FederationData>,
) -> Result<FederationJson<serde_json::Value>, Error> {
let user_id = uuid::Uuid::parse_str(&user_id_str)
.map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?;
data.user_repo
.find_by_id(user_id)
.await
.map_err(Error::from)?
.ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?;
let collection_id = format!("{}/users/{}/following", data.base_url, user_id_str);
let total = data
.federation_repo
.count_following(user_id)
.await
.map_err(Error::from)?;
if let Some(page) = query.page {
let page = page.max(1);
let offset = (page.saturating_sub(1) as usize) * PAGE_SIZE;
let following = data
.federation_repo
.get_following_page(user_id, offset as u32, PAGE_SIZE)
.await
.map_err(Error::from)?;
let has_next = offset + following.len() < total;
let items: Vec<String> = following.into_iter().map(|a| a.url).collect();
let mut obj = json!({
"@context": "https://www.w3.org/ns/activitystreams",
"type": "OrderedCollectionPage",
"id": format!("{}?page={}", collection_id, page),
"partOf": collection_id,
"totalItems": total,
"orderedItems": items,
});
if has_next {
obj["next"] = json!(format!("{}?page={}", collection_id, page + 1));
}
Ok(FederationJson(obj))
} else {
Ok(FederationJson(json!({
"@context": "https://www.w3.org/ns/activitystreams",
"type": "OrderedCollection",
"id": collection_id,
"totalItems": total,
"first": format!("{}?page=1", collection_id),
})))
}
collection_handler(&user_id_str, query, data, "following").await
}

View File

@@ -5,6 +5,8 @@ use serde::Serialize;
use crate::data::FederationData;
use crate::error::Error;
const NODEINFO_2_0_REL: &str = "http://nodeinfo.diaspora.software/ns/schema/2.0";
#[derive(Serialize)]
pub struct NodeInfoWellKnown {
pub links: Vec<NodeInfoLink>,
@@ -50,7 +52,7 @@ pub async fn nodeinfo_well_known_handler(
let href = format!("{}/nodeinfo/2.0", data.base_url);
Ok(Json(NodeInfoWellKnown {
links: vec![NodeInfoLink {
rel: "http://nodeinfo.diaspora.software/ns/schema/2.0".to_string(),
rel: NODEINFO_2_0_REL.to_string(),
href,
}],
}))

View File

@@ -9,9 +9,7 @@ use activitypub_federation::{
protocol::context::WithContext,
};
use crate::{activities::CreateActivity, data::FederationData, error::Error};
const PAGE_SIZE: usize = 20;
use crate::{activities::CreateActivity, data::FederationData, error::Error, urls::AP_PAGE_SIZE};
#[derive(Deserialize)]
pub struct OutboxQuery {
@@ -66,7 +64,7 @@ pub async fn outbox_handler(
let items = data
.object_handler
.get_local_objects_page(uuid, before, PAGE_SIZE)
.get_local_objects_page(uuid, before, AP_PAGE_SIZE)
.await
.map_err(|e| Error::from(anyhow::anyhow!("{}", e)))?;
@@ -74,7 +72,7 @@ pub async fn outbox_handler(
.parse()
.expect("valid url");
let has_more = items.len() == PAGE_SIZE;
let has_more = items.len() == AP_PAGE_SIZE;
let oldest_ts = items.last().map(|(_, _, ts)| *ts);
let followers_url = format!("{}/followers", actor_url);
@@ -110,7 +108,7 @@ pub async fn outbox_handler(
};
Ok(axum::Json(OrderedCollectionPage {
context: "https://www.w3.org/ns/activitystreams".to_string(),
context: crate::urls::AP_CONTEXT.to_string(),
kind: "OrderedCollectionPage".to_string(),
id: page_id,
part_of: outbox_url,
@@ -127,7 +125,7 @@ pub async fn outbox_handler(
.len() as u64;
Ok(axum::Json(OrderedCollection {
context: "https://www.w3.org/ns/activitystreams".to_string(),
context: crate::urls::AP_CONTEXT.to_string(),
kind: "OrderedCollection".to_string(),
id: outbox_url.clone(),
total_items: total,

View File

@@ -27,6 +27,11 @@ use crate::{
webfinger::webfinger_handler,
};
const DELIVERY_MAX_ATTEMPTS: u32 = 3;
const DELIVERY_INITIAL_DELAY_SECS: u64 = 1;
const HTTP_FETCH_TIMEOUT_SECS: u64 = 30;
const BATCH_FETCH_SLEEP_MS: u64 = 100;
fn content_to_html(text: &str) -> String {
let escaped = text
.replace('&', "&amp;")
@@ -139,11 +144,11 @@ pub(crate) async fn send_with_retry(
) -> Vec<anyhow::Error> {
let mut failures = vec![];
for send in sends {
let mut delay = std::time::Duration::from_secs(1);
for attempt in 1..=3u32 {
let mut delay = std::time::Duration::from_secs(DELIVERY_INITIAL_DELAY_SECS);
for attempt in 1..=DELIVERY_MAX_ATTEMPTS {
match send.clone().sign_and_send(data).await {
Ok(()) => break,
Err(e) if attempt < 3 => {
Err(e) if attempt < DELIVERY_MAX_ATTEMPTS => {
tracing::warn!(attempt, error = %e, "delivery failed, retrying");
tokio::time::sleep(delay).await;
delay *= 2;
@@ -1206,7 +1211,7 @@ impl ActivityPubService {
pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.timeout(std::time::Duration::from_secs(HTTP_FETCH_TIMEOUT_SECS))
.build()?;
let data = self.federation_config.to_request_data();
let actor = url::Url::parse(actor_url)?;
@@ -1384,7 +1389,7 @@ impl ActivityPubService {
failure_count += 1;
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
tokio::time::sleep(std::time::Duration::from_millis(BATCH_FETCH_SLEEP_MS)).await;
}
tracing::info!(
@@ -1705,16 +1710,16 @@ impl domain::ports::FederationActionPort for ActivityPubService {
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?;
let obj = if let Some(p) = page {
let p = p.max(1);
let offset = (p.saturating_sub(1) as usize) * 20;
let offset = (p.saturating_sub(1) as usize) * crate::urls::AP_PAGE_SIZE;
let followers = data
.federation_repo
.get_followers_page(uuid, offset as u32, 20)
.get_followers_page(uuid, offset as u32, crate::urls::AP_PAGE_SIZE)
.await
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?;
let has_next = offset + followers.len() < total;
let items: Vec<String> = followers.into_iter().map(|f| f.actor.url).collect();
let mut obj = serde_json::json!({
"@context": "https://www.w3.org/ns/activitystreams",
"@context": crate::urls::AP_CONTEXT,
"type": "OrderedCollectionPage",
"id": format!("{}?page={}", collection_id, p),
"partOf": collection_id,
@@ -1727,7 +1732,7 @@ impl domain::ports::FederationActionPort for ActivityPubService {
obj
} else {
serde_json::json!({
"@context": "https://www.w3.org/ns/activitystreams",
"@context": crate::urls::AP_CONTEXT,
"type": "OrderedCollection",
"id": collection_id,
"totalItems": total,
@@ -1753,16 +1758,16 @@ impl domain::ports::FederationActionPort for ActivityPubService {
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?;
let obj = if let Some(p) = page {
let p = p.max(1);
let offset = (p.saturating_sub(1) as usize) * 20;
let offset = (p.saturating_sub(1) as usize) * crate::urls::AP_PAGE_SIZE;
let following = data
.federation_repo
.get_following_page(uuid, offset as u32, 20)
.get_following_page(uuid, offset as u32, crate::urls::AP_PAGE_SIZE)
.await
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?;
let has_next = offset + following.len() < total;
let items: Vec<String> = following.into_iter().map(|a| a.url).collect();
let mut obj = serde_json::json!({
"@context": "https://www.w3.org/ns/activitystreams",
"@context": crate::urls::AP_CONTEXT,
"type": "OrderedCollectionPage",
"id": format!("{}?page={}", collection_id, p),
"partOf": collection_id,
@@ -1775,7 +1780,7 @@ impl domain::ports::FederationActionPort for ActivityPubService {
obj
} else {
serde_json::json!({
"@context": "https://www.w3.org/ns/activitystreams",
"@context": crate::urls::AP_CONTEXT,
"type": "OrderedCollection",
"id": collection_id,
"totalItems": total,

View File

@@ -3,6 +3,8 @@ use url::Url;
use crate::error::Error;
pub const AS_PUBLIC: &str = "https://www.w3.org/ns/activitystreams#Public";
pub const AP_CONTEXT: &str = "https://www.w3.org/ns/activitystreams";
pub const AP_PAGE_SIZE: usize = 20;
pub fn extract_user_id_from_url(url: &Url) -> Option<uuid::Uuid> {
let path = url.path();