From 314dad54516b754ee050fcdc1864fb3acd6e7b89 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 15 May 2026 12:31:25 +0200 Subject: [PATCH] Refactor database error handling across repositories to use IntoDbResult for improved error management - Updated PgNotificationRepository to utilize IntoDbResult for error handling in various methods. - Refactored PgRemoteActorRepository to replace manual error mapping with IntoDbResult. - Modified PgRemoteActorConnectionRepository to implement IntoDbResult for error handling. - Adjusted PgTagRepository to use IntoDbResult for consistent error management. - Introduced test_helpers module for seeding users and thoughts in tests. - Enhanced PgThoughtRepository to leverage IntoDbResult for error handling. - Updated PgTopFriendRepository to utilize IntoDbResult for error management. - Refactored PgUserRepository to implement IntoDbResult for error handling. - Added constants for pagination defaults in requests. - Introduced MAX_TOP_FRIENDS constant for top friends validation. - Refactored JWT expiration time to use a constant. - Improved rate limiter configuration with constants for better readability. - Added utility methods for FollowState and Visibility enums for string conversions. - Introduced maximum length constants for Username, Email, and Content value objects. - Cleaned up test modules by removing redundant code and utilizing a shared testing state. --- .../adapters/activitypub-base/src/actors.rs | 51 +++-- .../activitypub-base/src/followers_handler.rs | 183 ++++++++---------- .../adapters/activitypub-base/src/nodeinfo.rs | 4 +- .../adapters/activitypub-base/src/outbox.rs | 12 +- .../adapters/activitypub-base/src/service.rs | 31 +-- crates/adapters/activitypub-base/src/urls.rs | 2 + crates/adapters/activitypub/src/handler.rs | 11 +- crates/adapters/postgres-search/Cargo.toml | 1 + crates/adapters/postgres-search/src/lib.rs | 58 +----- crates/adapters/postgres/src/activitypub.rs | 30 +-- crates/adapters/postgres/src/api_key.rs | 9 +- crates/adapters/postgres/src/block.rs | 24 +-- crates/adapters/postgres/src/boost.rs | 47 +---- crates/adapters/postgres/src/db_error.rs | 11 ++ crates/adapters/postgres/src/feed.rs | 35 ++-- crates/adapters/postgres/src/follow.rs | 60 ++---- crates/adapters/postgres/src/lib.rs | 3 + crates/adapters/postgres/src/like.rs | 47 +---- crates/adapters/postgres/src/notification.rs | 13 +- crates/adapters/postgres/src/remote_actor.rs | 5 +- .../postgres/src/remote_actor_connections.rs | 7 +- crates/adapters/postgres/src/tag.rs | 17 +- crates/adapters/postgres/src/test_helpers.rs | 37 ++++ crates/adapters/postgres/src/thought.rs | 61 ++---- crates/adapters/postgres/src/top_friend.rs | 17 +- crates/adapters/postgres/src/user.rs | 19 +- crates/api-types/src/requests.rs | 8 +- crates/application/src/use_cases/profile.rs | 4 +- crates/bootstrap/src/factory.rs | 4 +- crates/bootstrap/src/main.rs | 9 +- crates/domain/src/models/social.rs | 18 ++ crates/domain/src/models/thought.rs | 9 + crates/domain/src/value_objects.rs | 10 +- .../src/handlers/federation_actors.rs | 55 +----- crates/presentation/src/handlers/feed.rs | 12 +- .../src/handlers/notifications.rs | 55 +----- crates/presentation/src/handlers/social.rs | 55 +----- crates/presentation/src/handlers/users.rs | 55 +----- crates/presentation/src/lib.rs | 2 + crates/presentation/src/testing.rs | 55 ++++++ 40 files changed, 456 insertions(+), 690 deletions(-) create mode 100644 crates/adapters/postgres/src/db_error.rs create mode 100644 crates/adapters/postgres/src/test_helpers.rs create mode 100644 crates/presentation/src/testing.rs diff --git a/crates/adapters/activitypub-base/src/actors.rs b/crates/adapters/activitypub-base/src/actors.rs index 5bec4e3..e321485 100644 --- a/crates/adapters/activitypub-base/src/actors.rs +++ b/crates/adapters/activitypub-base/src/actors.rs @@ -91,6 +91,29 @@ pub struct Person { attachment: Vec, } +struct ActorUrls { + ap_id: Url, + inbox_url: Url, + shared_inbox_url: Option, + outbox_url: Url, + followers_url: Url, + following_url: Url, +} + +impl ActorUrls { + fn build(base_url: &str, user_id: uuid::Uuid) -> Self { + let ap_id = crate::urls::actor_url(base_url, user_id); + Self { + inbox_url: Url::parse(&format!("{}/inbox", &ap_id)).expect("valid url"), + shared_inbox_url: Url::parse(&format!("{}/inbox", base_url)).ok(), + outbox_url: Url::parse(&format!("{}/outbox", &ap_id)).expect("valid url"), + followers_url: Url::parse(&format!("{}/followers", &ap_id)).expect("valid url"), + following_url: Url::parse(&format!("{}/following", &ap_id)).expect("valid url"), + ap_id, + } + } +} + pub async fn get_local_actor( user_id: uuid::Uuid, data: &Data, @@ -117,12 +140,14 @@ pub async fn get_local_actor( } }; - let ap_id = crate::urls::actor_url(&data.base_url, user_id); - let inbox_url = Url::parse(&format!("{}/inbox", &ap_id)).expect("valid inbox url"); - let shared_inbox_url = Url::parse(&format!("{}/inbox", data.base_url)).ok(); - let outbox_url = Url::parse(&format!("{}/outbox", &ap_id)).expect("valid outbox url"); - let followers_url = Url::parse(&format!("{}/followers", &ap_id)).expect("valid followers url"); - let following_url = Url::parse(&format!("{}/following", &ap_id)).expect("valid following url"); + let ActorUrls { + ap_id, + inbox_url, + shared_inbox_url, + outbox_url, + followers_url, + following_url, + } = ActorUrls::build(&data.base_url, user_id); Ok(DbActor { user_id, @@ -182,12 +207,14 @@ impl Object for DbActor { None => return Ok(None), }; - let ap_id = crate::urls::actor_url(&data.base_url, user_id); - let inbox_url = Url::parse(&format!("{}/inbox", &ap_id)).expect("valid url"); - let shared_inbox_url = Url::parse(&format!("{}/inbox", data.base_url)).ok(); - let outbox_url = Url::parse(&format!("{}/outbox", &ap_id)).expect("valid url"); - let followers_url = Url::parse(&format!("{}/followers", &ap_id)).expect("valid url"); - let following_url = Url::parse(&format!("{}/following", &ap_id)).expect("valid url"); + let ActorUrls { + ap_id, + inbox_url, + shared_inbox_url, + outbox_url, + followers_url, + following_url, + } = ActorUrls::build(&data.base_url, user_id); Ok(Some(DbActor { user_id, diff --git a/crates/adapters/activitypub-base/src/followers_handler.rs b/crates/adapters/activitypub-base/src/followers_handler.rs index 36b4800..e5de463 100644 --- a/crates/adapters/activitypub-base/src/followers_handler.rs +++ b/crates/adapters/activitypub-base/src/followers_handler.rs @@ -5,70 +5,95 @@ use serde_json::json; use crate::data::FederationData; use crate::error::Error; - -const PAGE_SIZE: usize = 20; +use crate::urls::AP_PAGE_SIZE; #[derive(Deserialize)] pub struct PageQuery { page: Option, } +async fn collection_handler( + user_id_str: &str, + query: PageQuery, + data: Data, + collection_type: &str, +) -> Result, Error> { + let user_id = uuid::Uuid::parse_str(user_id_str) + .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?; + + data.user_repo + .find_by_id(user_id) + .await + .map_err(Error::from)? + .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; + + let collection_id = format!( + "{}/users/{}/{}", + data.base_url, user_id_str, collection_type + ); + + let total = match collection_type { + "followers" => data.federation_repo.count_followers(user_id).await, + _ => data.federation_repo.count_following(user_id).await, + } + .map_err(Error::from)?; + + if let Some(page) = query.page { + let page = page.max(1); + let offset = (page.saturating_sub(1) as usize) * AP_PAGE_SIZE; + + let items: Vec = match collection_type { + "followers" => data + .federation_repo + .get_followers_page(user_id, offset as u32, AP_PAGE_SIZE) + .await + .map_err(Error::from)? + .into_iter() + .map(|f| f.actor.url) + .collect(), + _ => data + .federation_repo + .get_following_page(user_id, offset as u32, AP_PAGE_SIZE) + .await + .map_err(Error::from)? + .into_iter() + .map(|a| a.url) + .collect(), + }; + + let has_next = offset + items.len() < total; + + let mut obj = json!({ + "@context": crate::urls::AP_CONTEXT, + "type": "OrderedCollectionPage", + "id": format!("{}?page={}", collection_id, page), + "partOf": collection_id, + "totalItems": total, + "orderedItems": items, + }); + + if has_next { + obj["next"] = json!(format!("{}?page={}", collection_id, page + 1)); + } + + Ok(FederationJson(obj)) + } else { + Ok(FederationJson(json!({ + "@context": crate::urls::AP_CONTEXT, + "type": "OrderedCollection", + "id": collection_id, + "totalItems": total, + "first": format!("{}?page=1", collection_id), + }))) + } +} + pub async fn followers_handler( Path(user_id_str): Path, Query(query): Query, data: Data, ) -> Result, Error> { - let user_id = uuid::Uuid::parse_str(&user_id_str) - .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?; - - data.user_repo - .find_by_id(user_id) - .await - .map_err(Error::from)? - .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; - - let collection_id = format!("{}/users/{}/followers", data.base_url, user_id_str); - let total = data - .federation_repo - .count_followers(user_id) - .await - .map_err(Error::from)?; - - if let Some(page) = query.page { - let page = page.max(1); - let offset = (page.saturating_sub(1) as usize) * PAGE_SIZE; - let followers = data - .federation_repo - .get_followers_page(user_id, offset as u32, PAGE_SIZE) - .await - .map_err(Error::from)?; - - let has_next = offset + followers.len() < total; - let items: Vec = followers.into_iter().map(|f| f.actor.url).collect(); - - let mut obj = json!({ - "@context": "https://www.w3.org/ns/activitystreams", - "type": "OrderedCollectionPage", - "id": format!("{}?page={}", collection_id, page), - "partOf": collection_id, - "totalItems": total, - "orderedItems": items, - }); - - if has_next { - obj["next"] = json!(format!("{}?page={}", collection_id, page + 1)); - } - - Ok(FederationJson(obj)) - } else { - Ok(FederationJson(json!({ - "@context": "https://www.w3.org/ns/activitystreams", - "type": "OrderedCollection", - "id": collection_id, - "totalItems": total, - "first": format!("{}?page=1", collection_id), - }))) - } + collection_handler(&user_id_str, query, data, "followers").await } pub async fn following_handler( @@ -76,55 +101,5 @@ pub async fn following_handler( Query(query): Query, data: Data, ) -> Result, Error> { - let user_id = uuid::Uuid::parse_str(&user_id_str) - .map_err(|_| Error::bad_request(anyhow::anyhow!("invalid user id")))?; - - data.user_repo - .find_by_id(user_id) - .await - .map_err(Error::from)? - .ok_or_else(|| Error::not_found(anyhow::anyhow!("user not found")))?; - - let collection_id = format!("{}/users/{}/following", data.base_url, user_id_str); - let total = data - .federation_repo - .count_following(user_id) - .await - .map_err(Error::from)?; - - if let Some(page) = query.page { - let page = page.max(1); - let offset = (page.saturating_sub(1) as usize) * PAGE_SIZE; - let following = data - .federation_repo - .get_following_page(user_id, offset as u32, PAGE_SIZE) - .await - .map_err(Error::from)?; - - let has_next = offset + following.len() < total; - let items: Vec = following.into_iter().map(|a| a.url).collect(); - - let mut obj = json!({ - "@context": "https://www.w3.org/ns/activitystreams", - "type": "OrderedCollectionPage", - "id": format!("{}?page={}", collection_id, page), - "partOf": collection_id, - "totalItems": total, - "orderedItems": items, - }); - - if has_next { - obj["next"] = json!(format!("{}?page={}", collection_id, page + 1)); - } - - Ok(FederationJson(obj)) - } else { - Ok(FederationJson(json!({ - "@context": "https://www.w3.org/ns/activitystreams", - "type": "OrderedCollection", - "id": collection_id, - "totalItems": total, - "first": format!("{}?page=1", collection_id), - }))) - } + collection_handler(&user_id_str, query, data, "following").await } diff --git a/crates/adapters/activitypub-base/src/nodeinfo.rs b/crates/adapters/activitypub-base/src/nodeinfo.rs index 1b95ae8..f619d75 100644 --- a/crates/adapters/activitypub-base/src/nodeinfo.rs +++ b/crates/adapters/activitypub-base/src/nodeinfo.rs @@ -5,6 +5,8 @@ use serde::Serialize; use crate::data::FederationData; use crate::error::Error; +const NODEINFO_2_0_REL: &str = "http://nodeinfo.diaspora.software/ns/schema/2.0"; + #[derive(Serialize)] pub struct NodeInfoWellKnown { pub links: Vec, @@ -50,7 +52,7 @@ pub async fn nodeinfo_well_known_handler( let href = format!("{}/nodeinfo/2.0", data.base_url); Ok(Json(NodeInfoWellKnown { links: vec![NodeInfoLink { - rel: "http://nodeinfo.diaspora.software/ns/schema/2.0".to_string(), + rel: NODEINFO_2_0_REL.to_string(), href, }], })) diff --git a/crates/adapters/activitypub-base/src/outbox.rs b/crates/adapters/activitypub-base/src/outbox.rs index d9a209e..327477b 100644 --- a/crates/adapters/activitypub-base/src/outbox.rs +++ b/crates/adapters/activitypub-base/src/outbox.rs @@ -9,9 +9,7 @@ use activitypub_federation::{ protocol::context::WithContext, }; -use crate::{activities::CreateActivity, data::FederationData, error::Error}; - -const PAGE_SIZE: usize = 20; +use crate::{activities::CreateActivity, data::FederationData, error::Error, urls::AP_PAGE_SIZE}; #[derive(Deserialize)] pub struct OutboxQuery { @@ -66,7 +64,7 @@ pub async fn outbox_handler( let items = data .object_handler - .get_local_objects_page(uuid, before, PAGE_SIZE) + .get_local_objects_page(uuid, before, AP_PAGE_SIZE) .await .map_err(|e| Error::from(anyhow::anyhow!("{}", e)))?; @@ -74,7 +72,7 @@ pub async fn outbox_handler( .parse() .expect("valid url"); - let has_more = items.len() == PAGE_SIZE; + let has_more = items.len() == AP_PAGE_SIZE; let oldest_ts = items.last().map(|(_, _, ts)| *ts); let followers_url = format!("{}/followers", actor_url); @@ -110,7 +108,7 @@ pub async fn outbox_handler( }; Ok(axum::Json(OrderedCollectionPage { - context: "https://www.w3.org/ns/activitystreams".to_string(), + context: crate::urls::AP_CONTEXT.to_string(), kind: "OrderedCollectionPage".to_string(), id: page_id, part_of: outbox_url, @@ -127,7 +125,7 @@ pub async fn outbox_handler( .len() as u64; Ok(axum::Json(OrderedCollection { - context: "https://www.w3.org/ns/activitystreams".to_string(), + context: crate::urls::AP_CONTEXT.to_string(), kind: "OrderedCollection".to_string(), id: outbox_url.clone(), total_items: total, diff --git a/crates/adapters/activitypub-base/src/service.rs b/crates/adapters/activitypub-base/src/service.rs index 7a406e9..331dff6 100644 --- a/crates/adapters/activitypub-base/src/service.rs +++ b/crates/adapters/activitypub-base/src/service.rs @@ -27,6 +27,11 @@ use crate::{ webfinger::webfinger_handler, }; +const DELIVERY_MAX_ATTEMPTS: u32 = 3; +const DELIVERY_INITIAL_DELAY_SECS: u64 = 1; +const HTTP_FETCH_TIMEOUT_SECS: u64 = 30; +const BATCH_FETCH_SLEEP_MS: u64 = 100; + fn content_to_html(text: &str) -> String { let escaped = text .replace('&', "&") @@ -139,11 +144,11 @@ pub(crate) async fn send_with_retry( ) -> Vec { let mut failures = vec![]; for send in sends { - let mut delay = std::time::Duration::from_secs(1); - for attempt in 1..=3u32 { + let mut delay = std::time::Duration::from_secs(DELIVERY_INITIAL_DELAY_SECS); + for attempt in 1..=DELIVERY_MAX_ATTEMPTS { match send.clone().sign_and_send(data).await { Ok(()) => break, - Err(e) if attempt < 3 => { + Err(e) if attempt < DELIVERY_MAX_ATTEMPTS => { tracing::warn!(attempt, error = %e, "delivery failed, retrying"); tokio::time::sleep(delay).await; delay *= 2; @@ -1206,7 +1211,7 @@ impl ActivityPubService { pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> { let client = reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(30)) + .timeout(std::time::Duration::from_secs(HTTP_FETCH_TIMEOUT_SECS)) .build()?; let data = self.federation_config.to_request_data(); let actor = url::Url::parse(actor_url)?; @@ -1384,7 +1389,7 @@ impl ActivityPubService { failure_count += 1; } } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_millis(BATCH_FETCH_SLEEP_MS)).await; } tracing::info!( @@ -1705,16 +1710,16 @@ impl domain::ports::FederationActionPort for ActivityPubService { .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; let obj = if let Some(p) = page { let p = p.max(1); - let offset = (p.saturating_sub(1) as usize) * 20; + let offset = (p.saturating_sub(1) as usize) * crate::urls::AP_PAGE_SIZE; let followers = data .federation_repo - .get_followers_page(uuid, offset as u32, 20) + .get_followers_page(uuid, offset as u32, crate::urls::AP_PAGE_SIZE) .await .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; let has_next = offset + followers.len() < total; let items: Vec = followers.into_iter().map(|f| f.actor.url).collect(); let mut obj = serde_json::json!({ - "@context": "https://www.w3.org/ns/activitystreams", + "@context": crate::urls::AP_CONTEXT, "type": "OrderedCollectionPage", "id": format!("{}?page={}", collection_id, p), "partOf": collection_id, @@ -1727,7 +1732,7 @@ impl domain::ports::FederationActionPort for ActivityPubService { obj } else { serde_json::json!({ - "@context": "https://www.w3.org/ns/activitystreams", + "@context": crate::urls::AP_CONTEXT, "type": "OrderedCollection", "id": collection_id, "totalItems": total, @@ -1753,16 +1758,16 @@ impl domain::ports::FederationActionPort for ActivityPubService { .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; let obj = if let Some(p) = page { let p = p.max(1); - let offset = (p.saturating_sub(1) as usize) * 20; + let offset = (p.saturating_sub(1) as usize) * crate::urls::AP_PAGE_SIZE; let following = data .federation_repo - .get_following_page(uuid, offset as u32, 20) + .get_following_page(uuid, offset as u32, crate::urls::AP_PAGE_SIZE) .await .map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?; let has_next = offset + following.len() < total; let items: Vec = following.into_iter().map(|a| a.url).collect(); let mut obj = serde_json::json!({ - "@context": "https://www.w3.org/ns/activitystreams", + "@context": crate::urls::AP_CONTEXT, "type": "OrderedCollectionPage", "id": format!("{}?page={}", collection_id, p), "partOf": collection_id, @@ -1775,7 +1780,7 @@ impl domain::ports::FederationActionPort for ActivityPubService { obj } else { serde_json::json!({ - "@context": "https://www.w3.org/ns/activitystreams", + "@context": crate::urls::AP_CONTEXT, "type": "OrderedCollection", "id": collection_id, "totalItems": total, diff --git a/crates/adapters/activitypub-base/src/urls.rs b/crates/adapters/activitypub-base/src/urls.rs index 884897a..36bf9c8 100644 --- a/crates/adapters/activitypub-base/src/urls.rs +++ b/crates/adapters/activitypub-base/src/urls.rs @@ -3,6 +3,8 @@ use url::Url; use crate::error::Error; pub const AS_PUBLIC: &str = "https://www.w3.org/ns/activitystreams#Public"; +pub const AP_CONTEXT: &str = "https://www.w3.org/ns/activitystreams"; +pub const AP_PAGE_SIZE: usize = 20; pub fn extract_user_id_from_url(url: &Url) -> Option { let path = url.path(); diff --git a/crates/adapters/activitypub/src/handler.rs b/crates/adapters/activitypub/src/handler.rs index e751e27..70d0bbb 100644 --- a/crates/adapters/activitypub/src/handler.rs +++ b/crates/adapters/activitypub/src/handler.rs @@ -1,5 +1,8 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; + +const USERS_PATH_PREFIX: &str = "/users/"; +const THOUGHTS_PATH_PREFIX: &str = "/thoughts/"; use chrono::{DateTime, Utc}; use std::sync::Arc; use url::Url; @@ -172,7 +175,7 @@ impl ApObjectHandler for ThoughtsObjectHandler { } let user_uuid = href_url .path() - .strip_prefix("/users/") + .strip_prefix(USERS_PATH_PREFIX) .and_then(|s| s.split('/').next()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); if let Some(uuid) = user_uuid { @@ -217,7 +220,7 @@ impl ApObjectHandler for ThoughtsObjectHandler { async fn on_like(&self, object_url: &Url, actor_url: &Url) -> Result<()> { let thought_uuid = object_url .path() - .strip_prefix("/thoughts/") + .strip_prefix(THOUGHTS_PATH_PREFIX) .and_then(|s| s.split('/').next()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); @@ -280,7 +283,7 @@ impl ApObjectHandler for ThoughtsObjectHandler { let thought_uuid = thought_ap_id .path() - .strip_prefix("/thoughts/") + .strip_prefix(THOUGHTS_PATH_PREFIX) .and_then(|s| s.split('/').next()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); @@ -305,7 +308,7 @@ impl ApObjectHandler for ThoughtsObjectHandler { async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> Result<()> { let thought_uuid = object_url .path() - .strip_prefix("/thoughts/") + .strip_prefix(THOUGHTS_PATH_PREFIX) .and_then(|s| s.split('/').next()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); diff --git a/crates/adapters/postgres-search/Cargo.toml b/crates/adapters/postgres-search/Cargo.toml index 071d1c2..4d4c0bb 100644 --- a/crates/adapters/postgres-search/Cargo.toml +++ b/crates/adapters/postgres-search/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] domain = { workspace = true } +postgres = { workspace = true } sqlx = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } diff --git a/crates/adapters/postgres-search/src/lib.rs b/crates/adapters/postgres-search/src/lib.rs index 3ea28b4..dd67998 100644 --- a/crates/adapters/postgres-search/src/lib.rs +++ b/crates/adapters/postgres-search/src/lib.rs @@ -1,26 +1,17 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; -fn visibility_from_str(s: &str) -> domain::models::thought::Visibility { - use domain::models::thought::Visibility; - match s { - "followers" => Visibility::Followers, - "unlisted" => Visibility::Unlisted, - "direct" => Visibility::Direct, - _ => Visibility::Public, - } -} - use domain::{ errors::DomainError, models::{ feed::{FeedEntry, PageParams, Paginated}, - thought::Thought, + thought::{Thought, Visibility}, user::User, }, ports::SearchPort, value_objects::{Content, Email, PasswordHash, ThoughtId, UserId, Username}, }; +use postgres::user::{UserRow, USER_SELECT}; use sqlx::PgPool; pub struct PgSearchRepository { @@ -88,7 +79,7 @@ fn row_to_entry(r: FeedRow) -> FeedEntry { in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid), in_reply_to_url: r.in_reply_to_url, ap_id: r.t_ap_id, - visibility: visibility_from_str(&r.visibility), + visibility: Visibility::from_db_str(&r.visibility), content_warning: r.content_warning, sensitive: r.sensitive, local: r.t_local, @@ -122,49 +113,6 @@ fn row_to_entry(r: FeedRow) -> FeedEntry { } } -#[derive(sqlx::FromRow)] -struct UserRow { - id: uuid::Uuid, - username: String, - email: String, - password_hash: String, - display_name: Option, - bio: Option, - avatar_url: Option, - header_url: Option, - custom_css: Option, - local: bool, - ap_id: Option, - inbox_url: Option, - created_at: DateTime, - updated_at: DateTime, -} - -impl From for User { - fn from(r: UserRow) -> Self { - User { - id: UserId::from_uuid(r.id), - username: Username::from_trusted(r.username), - email: Email::from_trusted(r.email), - password_hash: PasswordHash(r.password_hash), - display_name: r.display_name, - bio: r.bio, - avatar_url: r.avatar_url, - header_url: r.header_url, - custom_css: r.custom_css, - local: r.local, - ap_id: r.ap_id, - inbox_url: r.inbox_url, - created_at: r.created_at, - updated_at: r.updated_at, - } - } -} - -const USER_SELECT: &str = - "SELECT id,username,email,password_hash,display_name,bio,avatar_url,header_url,\ - custom_css,local,ap_id,inbox_url,created_at,updated_at FROM users"; - #[async_trait] impl SearchPort for PgSearchRepository { async fn search_thoughts( diff --git a/crates/adapters/postgres/src/activitypub.rs b/crates/adapters/postgres/src/activitypub.rs index a7d527c..44d7330 100644 --- a/crates/adapters/postgres/src/activitypub.rs +++ b/crates/adapters/postgres/src/activitypub.rs @@ -1,4 +1,8 @@ +use crate::db_error::IntoDbResult; use async_trait::async_trait; + +const MAX_REMOTE_CONTENT_CHARS: usize = 500; +const THOUGHTS_PATH_PREFIX: &str = "/thoughts/"; use chrono::{DateTime, Utc}; use sqlx::PgPool; use url::Url; @@ -47,7 +51,7 @@ impl ActivityPubRepository for PgActivityPubRepository { .bind(user_id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|rows| { rows.into_iter() .map(|r| OutboxEntry { @@ -113,7 +117,7 @@ impl ActivityPubRepository for PgActivityPubRepository { .fetch_all(&self.pool) .await } - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(rows .into_iter() @@ -145,7 +149,7 @@ impl ActivityPubRepository for PgActivityPubRepository { .bind(actor_ap_url.as_str()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|o| o.map(UserId::from_uuid)) } @@ -179,7 +183,7 @@ impl ActivityPubRepository for PgActivityPubRepository { .bind(actor_ap_url.as_str()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; // Re-fetch to get whichever id won the race self.find_remote_actor_id(actor_ap_url) .await? @@ -205,7 +209,7 @@ impl ActivityPubRepository for PgActivityPubRepository { .bind(user_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -220,13 +224,13 @@ impl ActivityPubRepository for PgActivityPubRepository { visibility: &str, in_reply_to: Option<&Url>, ) -> Result<(), DomainError> { - let capped: String = content.chars().take(500).collect(); + let capped: String = content.chars().take(MAX_REMOTE_CONTENT_CHARS).collect(); let (in_reply_to_id, in_reply_to_url) = match in_reply_to { Some(url) => { // If the parent is a local thought, extract its UUID for in_reply_to_id. let local_uuid = url .path() - .strip_prefix("/thoughts/") + .strip_prefix(THOUGHTS_PATH_PREFIX) .and_then(|s| s.split('/').next()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); (local_uuid, Some(url.as_str().to_string())) @@ -249,12 +253,12 @@ impl ActivityPubRepository for PgActivityPubRepository { .bind(&in_reply_to_url) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } async fn apply_note_update(&self, ap_id: &Url, new_content: &str) -> Result<(), DomainError> { - let capped: String = new_content.chars().take(500).collect(); + let capped: String = new_content.chars().take(MAX_REMOTE_CONTENT_CHARS).collect(); sqlx::query( "UPDATE thoughts SET content=$2,updated_at=NOW() WHERE ap_id=$1 AND local=false", ) @@ -262,7 +266,7 @@ impl ActivityPubRepository for PgActivityPubRepository { .bind(&capped) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -271,7 +275,7 @@ impl ActivityPubRepository for PgActivityPubRepository { .bind(ap_id.as_str()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -282,7 +286,7 @@ impl ActivityPubRepository for PgActivityPubRepository { .bind(actor_ap_url.as_str()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -290,7 +294,7 @@ impl ActivityPubRepository for PgActivityPubRepository { let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM thoughts WHERE local=true") .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(n as u64) } } diff --git a/crates/adapters/postgres/src/api_key.rs b/crates/adapters/postgres/src/api_key.rs index 9ff3049..4d6cca8 100644 --- a/crates/adapters/postgres/src/api_key.rs +++ b/crates/adapters/postgres/src/api_key.rs @@ -1,3 +1,4 @@ +use crate::db_error::IntoDbResult; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ @@ -30,7 +31,7 @@ impl ApiKeyRepository for PgApiKeyRepository { .bind(k.created_at) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -49,7 +50,7 @@ impl ApiKeyRepository for PgApiKeyRepository { .bind(hash) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|o| { o.map(|r| ApiKey { id: ApiKeyId::from_uuid(r.id), @@ -72,7 +73,7 @@ impl ApiKeyRepository for PgApiKeyRepository { } sqlx::query_as::<_, Row>("SELECT id,user_id,key_hash,name,created_at FROM api_keys WHERE user_id=$1 ORDER BY created_at DESC") .bind(user_id.as_uuid()).fetch_all(&self.pool).await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|rows| rows.into_iter().map(|r| ApiKey { id: ApiKeyId::from_uuid(r.id), user_id: UserId::from_uuid(r.user_id), key_hash: r.key_hash, name: r.name, created_at: r.created_at }).collect()) } @@ -82,7 +83,7 @@ impl ApiKeyRepository for PgApiKeyRepository { .bind(user_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } } diff --git a/crates/adapters/postgres/src/block.rs b/crates/adapters/postgres/src/block.rs index 9a2c434..53be416 100644 --- a/crates/adapters/postgres/src/block.rs +++ b/crates/adapters/postgres/src/block.rs @@ -1,3 +1,4 @@ +use crate::db_error::IntoDbResult; use async_trait::async_trait; use domain::{ errors::DomainError, models::social::Block, ports::BlockRepository, value_objects::UserId, @@ -24,7 +25,7 @@ impl BlockRepository for PgBlockRepository { .bind(b.created_at) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -34,7 +35,7 @@ impl BlockRepository for PgBlockRepository { .bind(blocked_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -45,7 +46,7 @@ impl BlockRepository for PgBlockRepository { .bind(blocked_id.as_uuid()) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(count > 0) } } @@ -53,22 +54,9 @@ impl BlockRepository for PgBlockRepository { #[cfg(test)] mod tests { use super::*; - use crate::user::PgUserRepository; + use crate::test_helpers::seed_user; use chrono::Utc; - use domain::ports::UserRepository; - use domain::{models::user::User, value_objects::*}; - - async fn seed_user(pool: &sqlx::PgPool, username: &str, email: &str) -> User { - let repo = PgUserRepository::new(pool.clone()); - let u = User::new_local( - UserId::new(), - Username::new(username).unwrap(), - Email::new(email).unwrap(), - PasswordHash("h".into()), - ); - repo.save(&u).await.unwrap(); - u - } + use domain::value_objects::*; #[sqlx::test(migrations = "./migrations")] async fn block_exists(pool: sqlx::PgPool) { diff --git a/crates/adapters/postgres/src/boost.rs b/crates/adapters/postgres/src/boost.rs index 20828a6..bc1e8b9 100644 --- a/crates/adapters/postgres/src/boost.rs +++ b/crates/adapters/postgres/src/boost.rs @@ -1,3 +1,4 @@ +use crate::db_error::IntoDbResult; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ @@ -24,7 +25,7 @@ impl BoostRepository for PgBoostRepository { "INSERT INTO boosts(id,user_id,thought_id,ap_id,created_at) VALUES($1,$2,$3,$4,$5) ON CONFLICT(user_id,thought_id) DO NOTHING" ) .bind(b.id.as_uuid()).bind(b.user_id.as_uuid()).bind(b.thought_id.as_uuid()).bind(&b.ap_id).bind(b.created_at) - .execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ()) + .execute(&self.pool).await.into_domain().map(|_| ()) } async fn delete(&self, user_id: &UserId, thought_id: &ThoughtId) -> Result<(), DomainError> { @@ -33,7 +34,7 @@ impl BoostRepository for PgBoostRepository { .bind(thought_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; if r.rows_affected() == 0 { return Err(DomainError::NotFound); } @@ -56,7 +57,7 @@ impl BoostRepository for PgBoostRepository { sqlx::query_as::<_, Row>("SELECT id,user_id,thought_id,ap_id,created_at FROM boosts WHERE user_id=$1 AND thought_id=$2") .bind(user_id.as_uuid()).bind(thought_id.as_uuid()) .fetch_optional(&self.pool).await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|o| o.map(|r| Boost { id: BoostId::from_uuid(r.id), user_id: UserId::from_uuid(r.user_id), thought_id: ThoughtId::from_uuid(r.thought_id), ap_id: r.ap_id, created_at: r.created_at })) } @@ -65,50 +66,20 @@ impl BoostRepository for PgBoostRepository { .bind(thought_id.as_uuid()) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() } } #[cfg(test)] mod tests { use super::*; - use crate::{thought::PgThoughtRepository, user::PgUserRepository}; + use crate::test_helpers::seed_user_and_thought; use chrono::Utc; - use domain::ports::{ThoughtRepository, UserRepository}; - use domain::{ - models::{ - thought::{Thought, Visibility}, - user::User, - }, - value_objects::*, - }; - - async fn seed(pool: &sqlx::PgPool) -> (User, Thought) { - let urepo = PgUserRepository::new(pool.clone()); - let trepo = PgThoughtRepository::new(pool.clone()); - let u = User::new_local( - UserId::new(), - Username::new("alice").unwrap(), - Email::new("alice@ex.com").unwrap(), - PasswordHash("h".into()), - ); - urepo.save(&u).await.unwrap(); - let t = Thought::new_local( - ThoughtId::new(), - u.id.clone(), - Content::new_local("hi").unwrap(), - None, - Visibility::Public, - None, - false, - ); - trepo.save(&t).await.unwrap(); - (u, t) - } + use domain::value_objects::*; #[sqlx::test(migrations = "./migrations")] async fn boost_and_count(pool: sqlx::PgPool) { - let (user, thought) = seed(&pool).await; + let (user, thought) = seed_user_and_thought(&pool).await; let repo = PgBoostRepository::new(pool); let boost = Boost { id: BoostId::new(), @@ -123,7 +94,7 @@ mod tests { #[sqlx::test(migrations = "./migrations")] async fn unboost(pool: sqlx::PgPool) { - let (user, thought) = seed(&pool).await; + let (user, thought) = seed_user_and_thought(&pool).await; let repo = PgBoostRepository::new(pool); let boost = Boost { id: BoostId::new(), diff --git a/crates/adapters/postgres/src/db_error.rs b/crates/adapters/postgres/src/db_error.rs new file mode 100644 index 0000000..74bb95f --- /dev/null +++ b/crates/adapters/postgres/src/db_error.rs @@ -0,0 +1,11 @@ +use domain::errors::DomainError; + +pub(crate) trait IntoDbResult { + fn into_domain(self) -> Result; +} + +impl IntoDbResult for Result { + fn into_domain(self) -> Result { + self.map_err(|e| DomainError::Internal(e.to_string())) + } +} diff --git a/crates/adapters/postgres/src/feed.rs b/crates/adapters/postgres/src/feed.rs index 393f313..14c0f0d 100644 --- a/crates/adapters/postgres/src/feed.rs +++ b/crates/adapters/postgres/src/feed.rs @@ -1,21 +1,12 @@ +use crate::db_error::IntoDbResult; use async_trait::async_trait; use chrono::{DateTime, Utc}; -fn visibility_from_str(s: &str) -> domain::models::thought::Visibility { - use domain::models::thought::Visibility; - match s { - "followers" => Visibility::Followers, - "unlisted" => Visibility::Unlisted, - "direct" => Visibility::Direct, - _ => Visibility::Public, - } -} - use domain::{ errors::DomainError, models::{ feed::{FeedEntry, PageParams, Paginated}, - thought::Thought, + thought::{Thought, Visibility}, user::User, }, ports::FeedRepository, @@ -127,7 +118,7 @@ fn row_to_entry(r: FeedRow) -> FeedEntry { in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid), in_reply_to_url: r.in_reply_to_url, ap_id: r.t_ap_id, - visibility: visibility_from_str(&r.visibility), + visibility: Visibility::from_db_str(&r.visibility), content_warning: r.content_warning, sensitive: r.sensitive, local: r.t_local, @@ -180,7 +171,7 @@ impl FeedRepository for PgFeedRepository { .bind(&ids) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; let sel = feed_select(viewer); let sql = format!("{sel} WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct' ORDER BY t.created_at DESC LIMIT $2 OFFSET $3", fed_clause); @@ -190,7 +181,7 @@ impl FeedRepository for PgFeedRepository { .bind(page.offset()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(Paginated { items: rows.into_iter().map(row_to_entry).collect(), @@ -211,7 +202,7 @@ impl FeedRepository for PgFeedRepository { ) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; let sel = feed_select(viewer); let sql = format!("{sel} WHERE t.local=true AND t.visibility='public' ORDER BY t.created_at DESC LIMIT $1 OFFSET $2"); @@ -220,7 +211,7 @@ impl FeedRepository for PgFeedRepository { .bind(page.offset()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(Paginated { items: rows.into_iter().map(row_to_entry).collect(), @@ -243,7 +234,7 @@ impl FeedRepository for PgFeedRepository { .bind(query) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; let sel = feed_select(viewer); let sql = format!("{sel} WHERE t.content % $1 AND t.visibility='public' ORDER BY similarity(t.content, $1) DESC LIMIT $2 OFFSET $3"); @@ -253,7 +244,7 @@ impl FeedRepository for PgFeedRepository { .bind(page.offset()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(Paginated { items: rows.into_iter().map(row_to_entry).collect(), @@ -279,7 +270,7 @@ impl FeedRepository for PgFeedRepository { .bind(tag_name) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; let sel = feed_select(viewer); let sql = format!( @@ -295,7 +286,7 @@ impl FeedRepository for PgFeedRepository { .bind(page.offset()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(Paginated { items: rows.into_iter().map(row_to_entry).collect(), @@ -324,7 +315,7 @@ impl FeedRepository for PgFeedRepository { .bind(viewer_uuid) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; let sel = feed_select(viewer); let sql = format!("{sel} WHERE t.user_id = $1 AND ($4::uuid = $1 OR (t.visibility != 'direct' AND (t.visibility IN ('public', 'unlisted') OR (t.visibility = 'followers' AND EXISTS(SELECT 1 FROM follows WHERE follower_id = $4 AND following_id = $1 AND state = 'accepted'))))) ORDER BY t.created_at DESC LIMIT $2 OFFSET $3"); @@ -335,7 +326,7 @@ impl FeedRepository for PgFeedRepository { .bind(viewer_uuid) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(Paginated { items: rows.into_iter().map(row_to_entry).collect(), diff --git a/crates/adapters/postgres/src/follow.rs b/crates/adapters/postgres/src/follow.rs index c1d542c..0aeba1b 100644 --- a/crates/adapters/postgres/src/follow.rs +++ b/crates/adapters/postgres/src/follow.rs @@ -1,24 +1,7 @@ +use crate::db_error::IntoDbResult; use async_trait::async_trait; use chrono::{DateTime, Utc}; -fn follow_state_from_str(s: &str) -> domain::models::social::FollowState { - use domain::models::social::FollowState; - match s { - "pending" => FollowState::Pending, - "rejected" => FollowState::Rejected, - _ => FollowState::Accepted, - } -} - -fn follow_state_as_str(state: &domain::models::social::FollowState) -> &'static str { - use domain::models::social::FollowState; - match state { - FollowState::Pending => "pending", - FollowState::Accepted => "accepted", - FollowState::Rejected => "rejected", - } -} - use domain::{ errors::DomainError, models::{ @@ -50,12 +33,12 @@ impl FollowRepository for PgFollowRepository { ) .bind(f.follower_id.as_uuid()) .bind(f.following_id.as_uuid()) - .bind(follow_state_as_str(&f.state)) + .bind(f.state.as_str()) .bind(&f.ap_id) .bind(f.created_at) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -65,7 +48,7 @@ impl FollowRepository for PgFollowRepository { .bind(following_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; if r.rows_affected() == 0 { return Err(DomainError::NotFound); } @@ -92,11 +75,11 @@ impl FollowRepository for PgFollowRepository { .bind(following_id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|o| o.map(|r| Follow { follower_id: UserId::from_uuid(r.follower_id), following_id: UserId::from_uuid(r.following_id), - state: follow_state_from_str(&r.state), + state: FollowState::from_db_str(&r.state), ap_id: r.ap_id, created_at: r.created_at, })) @@ -111,10 +94,10 @@ impl FollowRepository for PgFollowRepository { sqlx::query("UPDATE follows SET state=$3 WHERE follower_id=$1 AND following_id=$2") .bind(follower_id.as_uuid()) .bind(following_id.as_uuid()) - .bind(follow_state_as_str(state)) + .bind(state.as_str()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -129,7 +112,7 @@ impl FollowRepository for PgFollowRepository { .bind(user_id.as_uuid()) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; let rows = sqlx::query_as::<_, crate::user::UserRow>( "SELECT u.id,u.username,u.email,u.password_hash,u.display_name,u.bio,u.avatar_url,u.header_url,u.custom_css,u.local,u.ap_id,u.inbox_url,u.created_at,u.updated_at @@ -142,7 +125,7 @@ impl FollowRepository for PgFollowRepository { .bind(page.offset()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(Paginated { items: rows.into_iter().map(User::from).collect(), @@ -163,7 +146,7 @@ impl FollowRepository for PgFollowRepository { .bind(user_id.as_uuid()) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; let rows = sqlx::query_as::<_, crate::user::UserRow>( "SELECT u.id,u.username,u.email,u.password_hash,u.display_name,u.bio,u.avatar_url,u.header_url,u.custom_css,u.local,u.ap_id,u.inbox_url,u.created_at,u.updated_at @@ -176,7 +159,7 @@ impl FollowRepository for PgFollowRepository { .bind(page.offset()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(Paginated { items: rows.into_iter().map(User::from).collect(), @@ -196,7 +179,7 @@ impl FollowRepository for PgFollowRepository { .bind(user_id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(ids.into_iter().map(UserId::from_uuid).collect()) } } @@ -204,22 +187,9 @@ impl FollowRepository for PgFollowRepository { #[cfg(test)] mod tests { use super::*; - use crate::user::PgUserRepository; + use crate::test_helpers::seed_user; use chrono::Utc; - use domain::ports::UserRepository; - use domain::{models::user::User, value_objects::*}; - - async fn seed_user(pool: &sqlx::PgPool, username: &str, email: &str) -> User { - let repo = PgUserRepository::new(pool.clone()); - let u = User::new_local( - UserId::new(), - Username::new(username).unwrap(), - Email::new(email).unwrap(), - PasswordHash("h".into()), - ); - repo.save(&u).await.unwrap(); - u - } + use domain::value_objects::*; #[sqlx::test(migrations = "./migrations")] async fn save_and_find_follow(pool: sqlx::PgPool) { diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index dfe8a56..d22cd34 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -2,6 +2,7 @@ pub mod activitypub; pub mod api_key; pub mod block; pub mod boost; +mod db_error; pub mod feed; pub mod follow; pub mod like; @@ -9,6 +10,8 @@ pub mod notification; pub mod remote_actor; pub mod remote_actor_connections; pub mod tag; +#[cfg(test)] +pub(crate) mod test_helpers; pub mod thought; pub mod top_friend; pub mod user; diff --git a/crates/adapters/postgres/src/like.rs b/crates/adapters/postgres/src/like.rs index edb8ac5..43d5d59 100644 --- a/crates/adapters/postgres/src/like.rs +++ b/crates/adapters/postgres/src/like.rs @@ -1,3 +1,4 @@ +use crate::db_error::IntoDbResult; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ @@ -24,7 +25,7 @@ impl LikeRepository for PgLikeRepository { "INSERT INTO likes(id,user_id,thought_id,ap_id,created_at) VALUES($1,$2,$3,$4,$5) ON CONFLICT(user_id,thought_id) DO NOTHING" ) .bind(l.id.as_uuid()).bind(l.user_id.as_uuid()).bind(l.thought_id.as_uuid()).bind(&l.ap_id).bind(l.created_at) - .execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ()) + .execute(&self.pool).await.into_domain().map(|_| ()) } async fn delete(&self, user_id: &UserId, thought_id: &ThoughtId) -> Result<(), DomainError> { @@ -33,7 +34,7 @@ impl LikeRepository for PgLikeRepository { .bind(thought_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; if r.rows_affected() == 0 { return Err(DomainError::NotFound); } @@ -56,7 +57,7 @@ impl LikeRepository for PgLikeRepository { sqlx::query_as::<_, Row>("SELECT id,user_id,thought_id,ap_id,created_at FROM likes WHERE user_id=$1 AND thought_id=$2") .bind(user_id.as_uuid()).bind(thought_id.as_uuid()) .fetch_optional(&self.pool).await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|o| o.map(|r| Like { id: LikeId::from_uuid(r.id), user_id: UserId::from_uuid(r.user_id), thought_id: ThoughtId::from_uuid(r.thought_id), ap_id: r.ap_id, created_at: r.created_at })) } @@ -65,50 +66,20 @@ impl LikeRepository for PgLikeRepository { .bind(thought_id.as_uuid()) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() } } #[cfg(test)] mod tests { use super::*; - use crate::{thought::PgThoughtRepository, user::PgUserRepository}; + use crate::test_helpers::seed_user_and_thought; use chrono::Utc; - use domain::ports::{ThoughtRepository, UserRepository}; - use domain::{ - models::{ - thought::{Thought, Visibility}, - user::User, - }, - value_objects::*, - }; - - async fn seed(pool: &sqlx::PgPool) -> (User, Thought) { - let urepo = PgUserRepository::new(pool.clone()); - let trepo = PgThoughtRepository::new(pool.clone()); - let u = User::new_local( - UserId::new(), - Username::new("alice").unwrap(), - Email::new("alice@ex.com").unwrap(), - PasswordHash("h".into()), - ); - urepo.save(&u).await.unwrap(); - let t = Thought::new_local( - ThoughtId::new(), - u.id.clone(), - Content::new_local("hi").unwrap(), - None, - Visibility::Public, - None, - false, - ); - trepo.save(&t).await.unwrap(); - (u, t) - } + use domain::value_objects::*; #[sqlx::test(migrations = "./migrations")] async fn like_and_count(pool: sqlx::PgPool) { - let (user, thought) = seed(&pool).await; + let (user, thought) = seed_user_and_thought(&pool).await; let repo = PgLikeRepository::new(pool); let like = Like { id: LikeId::new(), @@ -123,7 +94,7 @@ mod tests { #[sqlx::test(migrations = "./migrations")] async fn unlike(pool: sqlx::PgPool) { - let (user, thought) = seed(&pool).await; + let (user, thought) = seed_user_and_thought(&pool).await; let repo = PgLikeRepository::new(pool); let like = Like { id: LikeId::new(), diff --git a/crates/adapters/postgres/src/notification.rs b/crates/adapters/postgres/src/notification.rs index 6b6c92d..bf4ed6d 100644 --- a/crates/adapters/postgres/src/notification.rs +++ b/crates/adapters/postgres/src/notification.rs @@ -1,3 +1,4 @@ +use crate::db_error::IntoDbResult; use async_trait::async_trait; use chrono::{DateTime, Utc}; @@ -53,7 +54,7 @@ impl NotificationRepository for PgNotificationRepository { .bind(n.from_user_id.as_ref().map(|u| u.as_uuid())) .bind(n.thought_id.as_ref().map(|t| t.as_uuid())) .bind(n.read).bind(n.created_at) - .execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ()) + .execute(&self.pool).await.into_domain().map(|_| ()) } async fn list_for_user( @@ -65,7 +66,7 @@ impl NotificationRepository for PgNotificationRepository { .bind(user_id.as_uuid()) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; #[derive(sqlx::FromRow)] struct Row { id: uuid::Uuid, @@ -79,7 +80,7 @@ impl NotificationRepository for PgNotificationRepository { let rows = sqlx::query_as::<_, Row>( "SELECT id,user_id,type,from_user_id,thought_id,read,created_at FROM notifications WHERE user_id=$1 ORDER BY created_at DESC LIMIT $2 OFFSET $3" ).bind(user_id.as_uuid()).bind(page.limit()).bind(page.offset()) - .fetch_all(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + .fetch_all(&self.pool).await.into_domain()?; let items = rows .into_iter() .map(|r| Notification { @@ -107,7 +108,7 @@ impl NotificationRepository for PgNotificationRepository { .bind(user_id.as_uuid()) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(count as u64) } @@ -117,7 +118,7 @@ impl NotificationRepository for PgNotificationRepository { .bind(user_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -126,7 +127,7 @@ impl NotificationRepository for PgNotificationRepository { .bind(user_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } } diff --git a/crates/adapters/postgres/src/remote_actor.rs b/crates/adapters/postgres/src/remote_actor.rs index 20239e8..47f925d 100644 --- a/crates/adapters/postgres/src/remote_actor.rs +++ b/crates/adapters/postgres/src/remote_actor.rs @@ -1,3 +1,4 @@ +use crate::db_error::IntoDbResult; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ @@ -26,7 +27,7 @@ impl RemoteActorRepository for PgRemoteActorRepository { ) .bind(&a.url).bind(&a.handle).bind(&a.display_name).bind(&a.inbox_url) .bind(&a.shared_inbox_url).bind(&a.public_key).bind(&a.avatar_url).bind(a.last_fetched_at) - .execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ()) + .execute(&self.pool).await.into_domain().map(|_| ()) } async fn find_by_url(&self, url: &str) -> Result, DomainError> { @@ -44,7 +45,7 @@ impl RemoteActorRepository for PgRemoteActorRepository { sqlx::query_as::<_, Row>( "SELECT url,handle,display_name,inbox_url,shared_inbox_url,public_key,avatar_url,last_fetched_at FROM remote_actors WHERE url=$1" ).bind(url).fetch_optional(&self.pool).await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|o| o.map(|r| RemoteActor { url: r.url, handle: r.handle, display_name: r.display_name, inbox_url: r.inbox_url, shared_inbox_url: r.shared_inbox_url, public_key: r.public_key, avatar_url: r.avatar_url, last_fetched_at: r.last_fetched_at, bio: None, banner_url: None, also_known_as: None, outbox_url: None, followers_url: None, following_url: None, attachment: vec![] })) } } diff --git a/crates/adapters/postgres/src/remote_actor_connections.rs b/crates/adapters/postgres/src/remote_actor_connections.rs index 6259795..c2a92c9 100644 --- a/crates/adapters/postgres/src/remote_actor_connections.rs +++ b/crates/adapters/postgres/src/remote_actor_connections.rs @@ -1,3 +1,4 @@ +use crate::db_error::IntoDbResult; use async_trait::async_trait; use domain::{ errors::DomainError, models::actor_connection_summary::ActorConnectionSummary, @@ -46,7 +47,7 @@ impl RemoteActorConnectionRepository for PgRemoteActorConnectionRepository { .bind(&actor.avatar_url) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; } Ok(()) } @@ -75,7 +76,7 @@ impl RemoteActorConnectionRepository for PgRemoteActorConnectionRepository { .bind(page as i32) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(rows .into_iter() @@ -103,7 +104,7 @@ impl RemoteActorConnectionRepository for PgRemoteActorConnectionRepository { .bind(page as i32) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(row.and_then(|(ts,)| ts)) } diff --git a/crates/adapters/postgres/src/tag.rs b/crates/adapters/postgres/src/tag.rs index 9363408..8fa7451 100644 --- a/crates/adapters/postgres/src/tag.rs +++ b/crates/adapters/postgres/src/tag.rs @@ -1,3 +1,4 @@ +use crate::db_error::IntoDbResult; use async_trait::async_trait; use domain::{ errors::DomainError, @@ -28,7 +29,7 @@ impl TagRepository for PgTagRepository { .bind(&name) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; #[derive(sqlx::FromRow)] struct Row { id: i32, @@ -38,7 +39,7 @@ impl TagRepository for PgTagRepository { .bind(&name) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(Tag { id: row.id, name: row.name, @@ -57,7 +58,7 @@ impl TagRepository for PgTagRepository { .bind(tag_id) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -66,7 +67,7 @@ impl TagRepository for PgTagRepository { .bind(thought_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -79,7 +80,7 @@ impl TagRepository for PgTagRepository { sqlx::query_as::<_, Row>( "SELECT t.id,t.name FROM tags t JOIN thought_tags tt ON tt.tag_id=t.id WHERE tt.thought_id=$1" ).bind(thought_id.as_uuid()).fetch_all(&self.pool).await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|rows| rows.into_iter().map(|r| Tag { id: r.id, name: r.name }).collect()) } @@ -94,14 +95,14 @@ impl TagRepository for PgTagRepository { .bind(tag_name) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; let rows = sqlx::query_as::<_, crate::thought::ThoughtRow>( "SELECT th.id,th.user_id,th.content,th.in_reply_to_id,th.in_reply_to_url,th.ap_id,th.visibility,th.content_warning,th.sensitive,th.local,th.created_at,th.updated_at FROM thoughts th JOIN thought_tags tt ON tt.thought_id=th.id JOIN tags t ON t.id=tt.tag_id WHERE t.name=$1 ORDER BY th.created_at DESC LIMIT $2 OFFSET $3" ).bind(tag_name).bind(page.limit()).bind(page.offset()) - .fetch_all(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + .fetch_all(&self.pool).await.into_domain()?; Ok(Paginated { items: rows.into_iter().map(Thought::from).collect(), @@ -123,7 +124,7 @@ impl TagRepository for PgTagRepository { .bind(limit as i64) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() } } diff --git a/crates/adapters/postgres/src/test_helpers.rs b/crates/adapters/postgres/src/test_helpers.rs new file mode 100644 index 0000000..712ee5a --- /dev/null +++ b/crates/adapters/postgres/src/test_helpers.rs @@ -0,0 +1,37 @@ +use crate::{thought::PgThoughtRepository, user::PgUserRepository}; +use domain::{ + models::{ + thought::{Thought, Visibility}, + user::User, + }, + ports::{ThoughtRepository, UserRepository}, + value_objects::{Content, Email, PasswordHash, ThoughtId, UserId, Username}, +}; + +pub async fn seed_user(pool: &sqlx::PgPool, username: &str, email: &str) -> User { + let repo = PgUserRepository::new(pool.clone()); + let u = User::new_local( + UserId::new(), + Username::new(username).unwrap(), + Email::new(email).unwrap(), + PasswordHash("h".into()), + ); + repo.save(&u).await.unwrap(); + u +} + +pub async fn seed_user_and_thought(pool: &sqlx::PgPool) -> (User, Thought) { + let user = seed_user(pool, "alice", "alice@ex.com").await; + let trepo = PgThoughtRepository::new(pool.clone()); + let t = Thought::new_local( + ThoughtId::new(), + user.id.clone(), + Content::new_local("hi").unwrap(), + None, + Visibility::Public, + None, + false, + ); + trepo.save(&t).await.unwrap(); + (user, t) +} diff --git a/crates/adapters/postgres/src/thought.rs b/crates/adapters/postgres/src/thought.rs index 70e4c74..ed193d8 100644 --- a/crates/adapters/postgres/src/thought.rs +++ b/crates/adapters/postgres/src/thought.rs @@ -1,31 +1,12 @@ +use crate::db_error::IntoDbResult; use async_trait::async_trait; use chrono::{DateTime, Utc}; -fn visibility_from_str(s: &str) -> domain::models::thought::Visibility { - use domain::models::thought::Visibility; - match s { - "followers" => Visibility::Followers, - "unlisted" => Visibility::Unlisted, - "direct" => Visibility::Direct, - _ => Visibility::Public, - } -} - -fn visibility_as_str(v: &domain::models::thought::Visibility) -> &'static str { - use domain::models::thought::Visibility; - match v { - Visibility::Public => "public", - Visibility::Followers => "followers", - Visibility::Unlisted => "unlisted", - Visibility::Direct => "direct", - } -} - use domain::{ errors::DomainError, models::{ feed::{PageParams, Paginated}, - thought::Thought, + thought::{Thought, Visibility}, }, ports::ThoughtRepository, value_objects::{Content, ThoughtId, UserId}, @@ -66,7 +47,7 @@ impl From for Thought { in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid), in_reply_to_url: r.in_reply_to_url, ap_id: r.ap_id, - visibility: visibility_from_str(&r.visibility), + visibility: Visibility::from_db_str(&r.visibility), content_warning: r.content_warning, sensitive: r.sensitive, local: r.local, @@ -93,14 +74,14 @@ impl ThoughtRepository for PgThoughtRepository { .bind(t.in_reply_to_id.as_ref().map(|x| x.as_uuid())) .bind(&t.in_reply_to_url) .bind(&t.ap_id) - .bind(visibility_as_str(&t.visibility)) + .bind(t.visibility.as_str()) .bind(&t.content_warning) .bind(t.sensitive) .bind(t.local) .bind(t.created_at) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -109,7 +90,7 @@ impl ThoughtRepository for PgThoughtRepository { .bind(id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|o| o.map(Thought::from)) } @@ -119,7 +100,7 @@ impl ThoughtRepository for PgThoughtRepository { .bind(user_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; if r.rows_affected() == 0 { return Err(DomainError::NotFound); } @@ -132,7 +113,7 @@ impl ThoughtRepository for PgThoughtRepository { .bind(content.as_str()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -153,7 +134,7 @@ impl ThoughtRepository for PgThoughtRepository { .bind(id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|rows| rows.into_iter().map(Thought::from).collect()) } @@ -167,7 +148,7 @@ impl ThoughtRepository for PgThoughtRepository { .bind(uid) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; let rows = sqlx::query_as::<_, ThoughtRow>(&format!( "{THOUGHT_SELECT} WHERE user_id=$1 ORDER BY created_at DESC LIMIT $2 OFFSET $3" @@ -177,7 +158,7 @@ impl ThoughtRepository for PgThoughtRepository { .bind(page.offset()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(Paginated { items: rows.into_iter().map(Thought::from).collect(), @@ -191,28 +172,12 @@ impl ThoughtRepository for PgThoughtRepository { #[cfg(test)] mod tests { use super::*; - use crate::user::PgUserRepository; - use domain::ports::UserRepository; + use crate::test_helpers::seed_user; use domain::{ - models::{ - thought::{Thought, Visibility}, - user::User, - }, + models::thought::{Thought, Visibility}, value_objects::*, }; - async fn seed_user(pool: &sqlx::PgPool, username: &str, email: &str) -> User { - let repo = PgUserRepository::new(pool.clone()); - let u = User::new_local( - UserId::new(), - Username::new(username).unwrap(), - Email::new(email).unwrap(), - PasswordHash("h".into()), - ); - repo.save(&u).await.unwrap(); - u - } - #[sqlx::test(migrations = "./migrations")] async fn save_and_find_thought(pool: sqlx::PgPool) { let user = seed_user(&pool, "alice", "alice@ex.com").await; diff --git a/crates/adapters/postgres/src/top_friend.rs b/crates/adapters/postgres/src/top_friend.rs index 1594feb..00c1356 100644 --- a/crates/adapters/postgres/src/top_friend.rs +++ b/crates/adapters/postgres/src/top_friend.rs @@ -1,3 +1,4 @@ +use crate::db_error::IntoDbResult; use async_trait::async_trait; use domain::{ errors::DomainError, @@ -23,16 +24,12 @@ impl TopFriendRepository for PgTopFriendRepository { user_id: &UserId, friends: Vec<(UserId, i16)>, ) -> Result<(), DomainError> { - let mut tx = self - .pool - .begin() - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + let mut tx = self.pool.begin().await.into_domain()?; sqlx::query("DELETE FROM top_friends WHERE user_id=$1") .bind(user_id.as_uuid()) .execute(&mut *tx) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; for (friend_id, pos) in friends { sqlx::query("INSERT INTO top_friends(user_id,friend_id,position) VALUES($1,$2,$3)") .bind(user_id.as_uuid()) @@ -40,11 +37,9 @@ impl TopFriendRepository for PgTopFriendRepository { .bind(pos) .execute(&mut *tx) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; } - tx.commit() - .await - .map_err(|e| DomainError::Internal(e.to_string())) + tx.commit().await.into_domain() } async fn list_for_user(&self, user_id: &UserId) -> Result, DomainError> { @@ -79,7 +74,7 @@ impl TopFriendRepository for PgTopFriendRepository { .bind(user_id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(rows .into_iter() diff --git a/crates/adapters/postgres/src/user.rs b/crates/adapters/postgres/src/user.rs index 4b35cee..85b1ed2 100644 --- a/crates/adapters/postgres/src/user.rs +++ b/crates/adapters/postgres/src/user.rs @@ -1,3 +1,4 @@ +use crate::db_error::IntoDbResult; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ @@ -18,7 +19,7 @@ impl PgUserRepository { } #[derive(sqlx::FromRow)] -pub(crate) struct UserRow { +pub struct UserRow { pub id: uuid::Uuid, pub username: String, pub email: String, @@ -56,7 +57,7 @@ impl From for User { } } -const USER_SELECT: &str = "SELECT id,username,email,password_hash,display_name,bio,avatar_url,header_url,custom_css,local,ap_id,inbox_url,created_at,updated_at FROM users"; +pub const USER_SELECT: &str = "SELECT id,username,email,password_hash,display_name,bio,avatar_url,header_url,custom_css,local,ap_id,inbox_url,created_at,updated_at FROM users"; #[async_trait] impl UserRepository for PgUserRepository { @@ -65,7 +66,7 @@ impl UserRepository for PgUserRepository { .bind(id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|o| o.map(User::from)) } @@ -74,7 +75,7 @@ impl UserRepository for PgUserRepository { .bind(username.as_str()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|o| o.map(User::from)) } @@ -83,7 +84,7 @@ impl UserRepository for PgUserRepository { .bind(email.as_str()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|o| o.map(User::from)) } @@ -115,7 +116,7 @@ impl UserRepository for PgUserRepository { .bind(user.updated_at) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -139,7 +140,7 @@ impl UserRepository for PgUserRepository { .bind(custom_css) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() .map(|_| ()) } @@ -170,7 +171,7 @@ impl UserRepository for PgUserRepository { ) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .into_domain()?; Ok(rows .into_iter() @@ -191,7 +192,7 @@ impl UserRepository for PgUserRepository { sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM users WHERE local = true") .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .into_domain() } } diff --git a/crates/api-types/src/requests.rs b/crates/api-types/src/requests.rs index 160e702..c24f231 100644 --- a/crates/api-types/src/requests.rs +++ b/crates/api-types/src/requests.rs @@ -1,6 +1,10 @@ use serde::Deserialize; use uuid::Uuid; +pub const DEFAULT_PAGE: u64 = 1; +pub const DEFAULT_PER_PAGE: u64 = 20; +pub const MAX_PER_PAGE: u64 = 100; + #[derive(Deserialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct RegisterRequest { @@ -66,11 +70,11 @@ pub struct PaginationQuery { impl PaginationQuery { pub fn page(&self) -> u64 { - self.page.unwrap_or(1).max(1) + self.page.unwrap_or(DEFAULT_PAGE).max(DEFAULT_PAGE) } pub fn per_page(&self) -> u64 { - self.per_page.unwrap_or(20).min(100) + self.per_page.unwrap_or(DEFAULT_PER_PAGE).min(MAX_PER_PAGE) } } diff --git a/crates/application/src/use_cases/profile.rs b/crates/application/src/use_cases/profile.rs index 4ebbc83..5ed9057 100644 --- a/crates/application/src/use_cases/profile.rs +++ b/crates/application/src/use_cases/profile.rs @@ -1,3 +1,5 @@ +const MAX_TOP_FRIENDS: usize = 8; + use domain::{ errors::DomainError, events::DomainEvent, @@ -79,7 +81,7 @@ pub async fn set_top_friends( user_id: &UserId, friend_ids: Vec, ) -> Result<(), DomainError> { - if friend_ids.len() > 8 { + if friend_ids.len() > MAX_TOP_FRIENDS { return Err(DomainError::InvalidInput("top friends: max 8".into())); } let friends: Vec<(UserId, i16)> = friend_ids diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index e5fd0b9..cbce2b0 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -1,3 +1,5 @@ +const JWT_TTL_SECS: i64 = 86_400 * 30; + use async_trait::async_trait; use sqlx::PgPool; use std::sync::Arc; @@ -107,7 +109,7 @@ pub async fn build(cfg: &Config) -> Infrastructure { search: Arc::new(postgres_search::PgSearchRepository::new(pool.clone())), auth: Arc::new(auth::JwtAuthService::new( cfg.jwt_secret.clone(), - 86400 * 30, + JWT_TTL_SECS, )), hasher: Arc::new(auth::Argon2PasswordHasher), events: event_publisher, diff --git a/crates/bootstrap/src/main.rs b/crates/bootstrap/src/main.rs index 44a8958..1b6f9b3 100644 --- a/crates/bootstrap/src/main.rs +++ b/crates/bootstrap/src/main.rs @@ -1,6 +1,9 @@ mod config; mod factory; +const MS_PER_MINUTE: u64 = 60_000; +const RATE_LIMITER_CLEANUP_INTERVAL_SECS: u64 = 60; + use std::net::SocketAddr; use std::sync::Arc; use tower_http::cors::{AllowOrigin, CorsLayer}; @@ -46,7 +49,7 @@ async fn main() { // per_millisecond sets the token replenishment interval. // rate_limit = max requests/minute => replenish every (60000 / rate_limit) ms. - let ms = (60_000u64).saturating_div(rate_limit as u64).max(1); + let ms = MS_PER_MINUTE.saturating_div(rate_limit as u64).max(1); let governor_conf = Arc::new( GovernorConfigBuilder::default() .per_millisecond(ms) @@ -58,7 +61,9 @@ async fn main() { let limiter = governor_conf.limiter().clone(); tokio::spawn(async move { - let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); + let mut interval = tokio::time::interval(std::time::Duration::from_secs( + RATE_LIMITER_CLEANUP_INTERVAL_SECS, + )); loop { interval.tick().await; limiter.retain_recent(); diff --git a/crates/domain/src/models/social.rs b/crates/domain/src/models/social.rs index 12b62bc..8ac4df0 100644 --- a/crates/domain/src/models/social.rs +++ b/crates/domain/src/models/social.rs @@ -26,6 +26,24 @@ pub enum FollowState { Rejected, } +impl FollowState { + pub fn as_str(&self) -> &'static str { + match self { + Self::Pending => "pending", + Self::Accepted => "accepted", + Self::Rejected => "rejected", + } + } + + pub fn from_db_str(s: &str) -> Self { + match s { + "pending" => Self::Pending, + "rejected" => Self::Rejected, + _ => Self::Accepted, + } + } +} + #[derive(Debug, Clone)] pub struct Follow { pub follower_id: UserId, diff --git a/crates/domain/src/models/thought.rs b/crates/domain/src/models/thought.rs index c038758..d9980f7 100644 --- a/crates/domain/src/models/thought.rs +++ b/crates/domain/src/models/thought.rs @@ -34,6 +34,15 @@ impl Visibility { Visibility::Direct => "direct", } } + + pub fn from_db_str(s: &str) -> Self { + match s { + "followers" => Self::Followers, + "unlisted" => Self::Unlisted, + "direct" => Self::Direct, + _ => Self::Public, + } + } } impl Thought { diff --git a/crates/domain/src/value_objects.rs b/crates/domain/src/value_objects.rs index c35651c..7d3f3a8 100644 --- a/crates/domain/src/value_objects.rs +++ b/crates/domain/src/value_objects.rs @@ -1,6 +1,10 @@ use crate::errors::DomainError; use uuid::Uuid; +const MAX_USERNAME_LENGTH: usize = 32; +const MAX_EMAIL_LENGTH: usize = 255; +const MAX_CONTENT_LENGTH: usize = 128; + macro_rules! uuid_id { ($name:ident) => { #[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] @@ -41,7 +45,7 @@ pub struct Username(String); impl Username { pub fn new(s: impl Into) -> Result { let s = s.into(); - if s.is_empty() || s.len() > 32 { + if s.is_empty() || s.len() > MAX_USERNAME_LENGTH { return Err(DomainError::InvalidInput("username: 1-32 chars".into())); } if !s @@ -72,7 +76,7 @@ pub struct Email(String); impl Email { pub fn new(s: impl Into) -> Result { let s = s.into().to_lowercase(); - if !s.contains('@') || s.len() > 255 { + if !s.contains('@') || s.len() > MAX_EMAIL_LENGTH { return Err(DomainError::InvalidInput("invalid email".into())); } Ok(Self(s)) @@ -93,7 +97,7 @@ pub struct Content(String); impl Content { pub fn new_local(s: impl Into) -> Result { let s = s.into(); - if s.is_empty() || s.len() > 128 { + if s.is_empty() || s.len() > MAX_CONTENT_LENGTH { return Err(DomainError::InvalidInput("content: 1-128 chars".into())); } Ok(Self(s)) diff --git a/crates/presentation/src/handlers/federation_actors.rs b/crates/presentation/src/handlers/federation_actors.rs index 4a17e89..5cef076 100644 --- a/crates/presentation/src/handlers/federation_actors.rs +++ b/crates/presentation/src/handlers/federation_actors.rs @@ -92,63 +92,10 @@ async fn actor_connections_handler( #[cfg(test)] mod tests { use super::*; - use async_trait::async_trait; + use crate::testing::make_state; use axum::{body::Body, http::Request, routing::get, Router}; - use domain::{ - errors::DomainError, - ports::{AuthService, GeneratedToken, PasswordHasher}, - testing::TestStore, - value_objects::{PasswordHash, UserId}, - }; - use std::sync::Arc; use tower::ServiceExt; - struct NoOpAuth; - impl AuthService for NoOpAuth { - fn generate_token(&self, _uid: &UserId) -> Result { - Err(DomainError::Internal("noop".into())) - } - fn validate_token(&self, _token: &str) -> Result { - Err(DomainError::Unauthorized) - } - } - - struct NoOpHasher; - #[async_trait] - impl PasswordHasher for NoOpHasher { - async fn hash(&self, _plain: &str) -> Result { - Err(DomainError::Internal("noop".into())) - } - async fn verify(&self, _plain: &str, _hash: &PasswordHash) -> Result { - Ok(false) - } - } - - fn make_state() -> crate::state::AppState { - let store = Arc::new(TestStore::default()); - crate::state::AppState { - users: store.clone(), - thoughts: store.clone(), - likes: store.clone(), - boosts: store.clone(), - follows: store.clone(), - blocks: store.clone(), - tags: store.clone(), - api_keys: store.clone(), - top_friends: store.clone(), - notifications: store.clone(), - remote_actors: store.clone(), - feed: store.clone(), - search: store.clone(), - auth: Arc::new(NoOpAuth), - hasher: Arc::new(NoOpHasher), - events: store.clone(), - federation: store.clone(), - ap_repo: store.clone(), - remote_actor_connections: store.clone(), - } - } - fn app() -> Router { Router::new() .route( diff --git a/crates/presentation/src/handlers/feed.rs b/crates/presentation/src/handlers/feed.rs index 7600ce3..db6907c 100644 --- a/crates/presentation/src/handlers/feed.rs +++ b/crates/presentation/src/handlers/feed.rs @@ -98,8 +98,8 @@ pub async fn search_handler( Query(q): Query, ) -> Result, ApiError> { let page = PageParams { - page: q.page.unwrap_or(1), - per_page: q.per_page.unwrap_or(20), + page: q.page.unwrap_or(api_types::requests::DEFAULT_PAGE), + per_page: q.per_page.unwrap_or(api_types::requests::DEFAULT_PER_PAGE), }; let query = q.q.trim().to_string(); @@ -257,8 +257,12 @@ pub async fn get_popular_tags( let limit: usize = params .get("limit") .and_then(|v| v.parse().ok()) - .unwrap_or(20); - let tags = uc_get_popular_tags(&*s.tags, limit.min(100)).await?; + .unwrap_or(api_types::requests::DEFAULT_PER_PAGE as usize); + let tags = uc_get_popular_tags( + &*s.tags, + limit.min(api_types::requests::MAX_PER_PAGE as usize), + ) + .await?; Ok(Json(serde_json::json!({ "tags": tags.iter().map(|(name, count)| serde_json::json!({ "name": name, diff --git a/crates/presentation/src/handlers/notifications.rs b/crates/presentation/src/handlers/notifications.rs index b5fcbd0..69b5e1a 100644 --- a/crates/presentation/src/handlers/notifications.rs +++ b/crates/presentation/src/handlers/notifications.rs @@ -59,68 +59,15 @@ pub async fn mark_all_read( #[cfg(test)] mod tests { use super::*; - use async_trait::async_trait; + use crate::testing::make_state; use axum::{ body::Body, http::{header, Request}, routing::{get, patch}, Router, }; - use domain::{ - errors::DomainError, - ports::{AuthService, GeneratedToken, PasswordHasher}, - testing::TestStore, - value_objects::{PasswordHash, UserId}, - }; - use std::sync::Arc; use tower::ServiceExt; - struct NoOpAuth; - impl AuthService for NoOpAuth { - fn generate_token(&self, _uid: &UserId) -> Result { - Err(DomainError::Internal("noop".into())) - } - fn validate_token(&self, _token: &str) -> Result { - Err(DomainError::Unauthorized) - } - } - - struct NoOpHasher; - #[async_trait] - impl PasswordHasher for NoOpHasher { - async fn hash(&self, _plain: &str) -> Result { - Err(DomainError::Internal("noop".into())) - } - async fn verify(&self, _plain: &str, _hash: &PasswordHash) -> Result { - Ok(false) - } - } - - fn make_state() -> crate::state::AppState { - let store = Arc::new(TestStore::default()); - crate::state::AppState { - users: store.clone(), - thoughts: store.clone(), - likes: store.clone(), - boosts: store.clone(), - follows: store.clone(), - blocks: store.clone(), - tags: store.clone(), - api_keys: store.clone(), - top_friends: store.clone(), - notifications: store.clone(), - remote_actors: store.clone(), - feed: store.clone(), - search: store.clone(), - auth: Arc::new(NoOpAuth), - hasher: Arc::new(NoOpHasher), - events: store.clone(), - federation: store.clone(), - ap_repo: store.clone(), - remote_actor_connections: store.clone(), - } - } - fn app() -> Router { Router::new() .route("/notifications", patch(mark_all_read)) diff --git a/crates/presentation/src/handlers/social.rs b/crates/presentation/src/handlers/social.rs index 6ed583b..632448e 100644 --- a/crates/presentation/src/handlers/social.rs +++ b/crates/presentation/src/handlers/social.rs @@ -132,68 +132,15 @@ pub async fn get_top_friends_handler( #[cfg(test)] mod tests { use super::*; - use async_trait::async_trait; + use crate::testing::make_state; use axum::{ body::Body, http::Request, routing::{delete, post}, Router, }; - use domain::{ - errors::DomainError, - ports::{AuthService, GeneratedToken, PasswordHasher}, - testing::TestStore, - value_objects::{PasswordHash, UserId}, - }; - use std::sync::Arc; use tower::ServiceExt; - struct NoOpAuth; - impl AuthService for NoOpAuth { - fn generate_token(&self, _uid: &UserId) -> Result { - Err(DomainError::Internal("noop".into())) - } - fn validate_token(&self, _token: &str) -> Result { - Err(DomainError::Unauthorized) - } - } - - struct NoOpHasher; - #[async_trait] - impl PasswordHasher for NoOpHasher { - async fn hash(&self, _plain: &str) -> Result { - Err(DomainError::Internal("noop".into())) - } - async fn verify(&self, _plain: &str, _hash: &PasswordHash) -> Result { - Ok(false) - } - } - - fn make_state() -> crate::state::AppState { - let store = Arc::new(TestStore::default()); - crate::state::AppState { - users: store.clone(), - thoughts: store.clone(), - likes: store.clone(), - boosts: store.clone(), - follows: store.clone(), - blocks: store.clone(), - tags: store.clone(), - api_keys: store.clone(), - top_friends: store.clone(), - notifications: store.clone(), - remote_actors: store.clone(), - feed: store.clone(), - search: store.clone(), - auth: Arc::new(NoOpAuth), - hasher: Arc::new(NoOpHasher), - events: store.clone(), - federation: store.clone(), - ap_repo: store.clone(), - remote_actor_connections: store.clone(), - } - } - fn app() -> Router { Router::new() .route( diff --git a/crates/presentation/src/handlers/users.rs b/crates/presentation/src/handlers/users.rs index b07d385..c281bab 100644 --- a/crates/presentation/src/handlers/users.rs +++ b/crates/presentation/src/handlers/users.rs @@ -209,68 +209,15 @@ pub async fn lookup_handler( #[cfg(test)] mod tests { use super::*; - use async_trait::async_trait; + use crate::testing::make_state; use axum::{ body::Body, http::{header, Request}, routing::get, Router, }; - use domain::{ - errors::DomainError, - ports::{AuthService, GeneratedToken, PasswordHasher}, - testing::TestStore, - value_objects::{PasswordHash, UserId}, - }; - use std::sync::Arc; use tower::ServiceExt; - struct NoOpAuth; - impl AuthService for NoOpAuth { - fn generate_token(&self, _uid: &UserId) -> Result { - Err(DomainError::Internal("noop".into())) - } - fn validate_token(&self, _token: &str) -> Result { - Err(DomainError::Unauthorized) - } - } - - struct NoOpHasher; - #[async_trait] - impl PasswordHasher for NoOpHasher { - async fn hash(&self, _plain: &str) -> Result { - Err(DomainError::Internal("noop".into())) - } - async fn verify(&self, _plain: &str, _hash: &PasswordHash) -> Result { - Ok(false) - } - } - - fn make_state() -> crate::state::AppState { - let store = Arc::new(TestStore::default()); - crate::state::AppState { - users: store.clone(), - thoughts: store.clone(), - likes: store.clone(), - boosts: store.clone(), - follows: store.clone(), - blocks: store.clone(), - tags: store.clone(), - api_keys: store.clone(), - top_friends: store.clone(), - notifications: store.clone(), - remote_actors: store.clone(), - feed: store.clone(), - search: store.clone(), - auth: Arc::new(NoOpAuth), - hasher: Arc::new(NoOpHasher), - events: store.clone(), - federation: store.clone(), - ap_repo: store.clone(), - remote_actor_connections: store.clone(), - } - } - fn app() -> Router { Router::new() .route("/users/{username}", get(get_user)) diff --git a/crates/presentation/src/lib.rs b/crates/presentation/src/lib.rs index fa5838a..1d38562 100644 --- a/crates/presentation/src/lib.rs +++ b/crates/presentation/src/lib.rs @@ -4,3 +4,5 @@ pub mod handlers; pub mod openapi; pub mod routes; pub mod state; +#[cfg(test)] +pub mod testing; diff --git a/crates/presentation/src/testing.rs b/crates/presentation/src/testing.rs new file mode 100644 index 0000000..a73ba5a --- /dev/null +++ b/crates/presentation/src/testing.rs @@ -0,0 +1,55 @@ +use crate::state::AppState; +use async_trait::async_trait; +use domain::{ + errors::DomainError, + ports::{AuthService, GeneratedToken, PasswordHasher}, + testing::TestStore, + value_objects::{PasswordHash, UserId}, +}; +use std::sync::Arc; + +pub struct NoOpAuth; +impl AuthService for NoOpAuth { + fn generate_token(&self, _uid: &UserId) -> Result { + Err(DomainError::Internal("noop".into())) + } + fn validate_token(&self, _token: &str) -> Result { + Err(DomainError::Unauthorized) + } +} + +pub struct NoOpHasher; +#[async_trait] +impl PasswordHasher for NoOpHasher { + async fn hash(&self, _plain: &str) -> Result { + Err(DomainError::Internal("noop".into())) + } + async fn verify(&self, _plain: &str, _hash: &PasswordHash) -> Result { + Ok(false) + } +} + +pub fn make_state() -> AppState { + let store = Arc::new(TestStore::default()); + AppState { + users: store.clone(), + thoughts: store.clone(), + likes: store.clone(), + boosts: store.clone(), + follows: store.clone(), + blocks: store.clone(), + tags: store.clone(), + api_keys: store.clone(), + top_friends: store.clone(), + notifications: store.clone(), + remote_actors: store.clone(), + feed: store.clone(), + search: store.clone(), + auth: Arc::new(NoOpAuth), + hasher: Arc::new(NoOpHasher), + events: store.clone(), + federation: store.clone(), + ap_repo: store.clone(), + remote_actor_connections: store.clone(), + } +}