From 0592861edd5b52b933e298379a1a072c13b07905 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 15 May 2026 18:54:20 +0200 Subject: [PATCH] refactor: 5 architectural improvements (Tasks 2-5 + Task 6 fix) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - feat(domain): Hashtag value object with canonical extract() — unifies two divergent private implementations; fields pre-compute raw/normalized/url_slug/ap_name - feat(presentation): Deps extractor — each handler now declares its exact dependency surface; AppState unchanged; handlers become unit-testable without mocking all 20 deps - refactor(feed): replace 5 flat FeedRepository methods with FeedQuery/FeedScope — single query() method; SQL shared logic lives once; adding feed types no longer requires 5 edits - refactor(activitypub): ActivityPubRepository + OutboundFederationPort moved out of domain::ports into activitypub-base::ap_ports — domain crate no longer knows about AP IDs, inboxes, or actor URLs - fix(outbox): OutboxRelay now opens a per-row transaction so FOR UPDATE SKIP LOCKED actually holds the lock during publish + mark_delivered --- Cargo.lock | 3 + .../adapters/activitypub-base/src/ap_ports.rs | 167 ++++++++ crates/adapters/activitypub-base/src/lib.rs | 2 + .../adapters/activitypub-base/src/service.rs | 38 +- crates/adapters/activitypub/src/handler.rs | 4 +- crates/adapters/postgres/Cargo.toml | 7 +- crates/adapters/postgres/src/activitypub.rs | 4 +- crates/adapters/postgres/src/feed.rs | 360 ++++++++---------- crates/application/Cargo.toml | 5 +- crates/application/src/lib.rs | 3 + .../src/services/federation_event.rs | 57 ++- crates/application/src/testing.rs | 150 ++++++++ crates/application/src/use_cases/auth.rs | 68 +++- .../src/use_cases/federation_management.rs | 9 +- crates/application/src/use_cases/feed.rs | 12 +- crates/application/src/use_cases/thoughts.rs | 55 +-- crates/domain/src/hashtag.rs | 139 +++++++ crates/domain/src/lib.rs | 1 + crates/domain/src/ports.rs | 224 ++--------- crates/domain/src/testing.rs | 188 +-------- crates/presentation/Cargo.toml | 5 +- crates/presentation/src/extractors.rs | 21 + crates/presentation/src/handlers/api_keys.rs | 35 +- crates/presentation/src/handlers/auth.rs | 46 ++- .../src/handlers/federation_actors.rs | 62 ++- .../src/handlers/federation_management.rs | 58 ++- crates/presentation/src/handlers/feed.rs | 80 ++-- crates/presentation/src/handlers/health.rs | 25 +- .../src/handlers/notifications.rs | 39 +- crates/presentation/src/handlers/social.rs | 97 +++-- crates/presentation/src/handlers/thoughts.rs | 66 +++- crates/presentation/src/handlers/users.rs | 66 +++- crates/presentation/src/lib.rs | 2 + crates/presentation/src/state.rs | 1 + crates/presentation/src/testing.rs | 84 +++- crates/worker/src/factory.rs | 3 +- crates/worker/src/outbox_relay.rs | 80 ++-- 37 files changed, 1401 insertions(+), 865 deletions(-) create mode 100644 crates/adapters/activitypub-base/src/ap_ports.rs create mode 100644 crates/application/src/testing.rs create mode 100644 crates/domain/src/hashtag.rs diff --git a/Cargo.lock b/Cargo.lock index 9bb70d6..bdff7c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -289,6 +289,7 @@ dependencies = [ name = "application" version = "0.1.0" dependencies = [ + "activitypub-base", "async-trait", "chrono", "domain", @@ -2447,6 +2448,7 @@ checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" name = "postgres" version = "0.1.0" dependencies = [ + "activitypub-base", "async-trait", "chrono", "domain", @@ -2516,6 +2518,7 @@ dependencies = [ name = "presentation" version = "0.1.0" dependencies = [ + "activitypub-base", "api-types", "application", "async-trait", diff --git a/crates/adapters/activitypub-base/src/ap_ports.rs b/crates/adapters/activitypub-base/src/ap_ports.rs new file mode 100644 index 0000000..5f389f4 --- /dev/null +++ b/crates/adapters/activitypub-base/src/ap_ports.rs @@ -0,0 +1,167 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::thought::Thought, + value_objects::{ThoughtId, UserId, Username}, +}; + +/// AP-protocol endpoints for a locally-stored user (local or interned remote). +#[derive(Debug, Clone)] +pub struct ActorApUrls { + pub ap_id: String, + pub inbox_url: String, +} + +/// A local thought ready for AP serialization, with the author's username +/// pre-joined so the handler can build AP URLs without a second query. +#[derive(Debug, Clone)] +pub struct OutboxEntry { + pub thought: Thought, + pub author_username: Username, +} + +#[async_trait] +pub trait ActivityPubRepository: Send + Sync { + // ── Outbox (local → remote) ────────────────────────────────────── + + /// All public local thoughts for this actor. Used for outbox totals + /// and full-collection delivery. + async fn outbox_entries_for_actor( + &self, + user_id: &UserId, + ) -> Result, DomainError>; + + /// Cursor page of public local thoughts, newest-first, before `before`. + /// Used for OrderedCollectionPage responses. + async fn outbox_page_for_actor( + &self, + user_id: &UserId, + before: Option>, + limit: usize, + ) -> Result, DomainError>; + + // ── Remote actor resolution ────────────────────────────────────── + + /// Find the local UserId for a remote actor by its AP URL. + async fn find_remote_actor_id(&self, actor_ap_url: &str) + -> Result, DomainError>; + + /// Ensure a remote actor placeholder exists; create one if absent. + /// Idempotent — safe to call multiple times with the same URL. + async fn intern_remote_actor(&self, actor_ap_url: &str) -> Result; + + /// Update display_name and avatar_url for an already-interned remote actor. + async fn update_remote_actor_display( + &self, + user_id: &UserId, + display_name: Option<&str>, + avatar_url: Option<&str>, + ) -> Result<(), DomainError>; + + // ── Inbox processing (remote → local) ─────────────────────────── + + /// Persist an incoming remote Note. Idempotent on ap_id. + #[allow(clippy::too_many_arguments)] + async fn accept_note( + &self, + ap_id: &str, + author_id: &UserId, + content: &str, + published: chrono::DateTime, + sensitive: bool, + content_warning: Option, + visibility: &str, + in_reply_to: Option<&str>, + ) -> Result<(), DomainError>; + + /// Apply an Update to a previously accepted remote Note. + async fn apply_note_update(&self, ap_id: &str, new_content: &str) -> Result<(), DomainError>; + + /// Remove a specific remote Note (Delete activity). Only touches + /// remotely-originated thoughts. + async fn retract_note(&self, ap_id: &str) -> Result<(), DomainError>; + + /// Remove all Notes from a remote actor (actor-level Delete/Tombstone). + async fn retract_actor_notes(&self, actor_ap_url: &str) -> Result<(), DomainError>; + + // ── Node-level stats ───────────────────────────────────────────── + + /// Total locally-authored thought count for NodeInfo responses. + async fn count_local_notes(&self) -> Result; + + /// Return the ActivityPub object URL for a thought, if one is stored. + /// Returns None for local thoughts (caller constructs URL from base_url + thought_id). + async fn get_thought_ap_id( + &self, + thought_id: &ThoughtId, + ) -> Result, DomainError>; + + /// Return the AP actor URL and inbox URL for a user, if stored. + /// Returns None for users that have not been federated. + async fn get_actor_ap_urls(&self, user_id: &UserId) + -> Result, DomainError>; +} + +#[async_trait] +pub trait OutboundFederationPort: Send + Sync { + /// Fan out a new local Note to all accepted followers. + async fn broadcast_create( + &self, + author_user_id: &UserId, + thought: &Thought, + author_username: &str, + in_reply_to_url: Option<&str>, + ) -> Result<(), DomainError>; + + /// Fan out a Delete tombstone for a now-deleted local Note. + /// `thought_ap_id` is pre-constructed by the caller because the thought + /// has already been deleted from the DB when this fires. + async fn broadcast_delete( + &self, + author_user_id: &UserId, + thought_ap_id: &str, + ) -> Result<(), DomainError>; + + /// Fan out an Update(Note) for an edited local thought. + async fn broadcast_update( + &self, + author_user_id: &UserId, + thought: &Thought, + author_username: &str, + in_reply_to_url: Option<&str>, + ) -> Result<(), DomainError>; + + /// Fan out an Announce(object_ap_id) for a boost. + async fn broadcast_announce( + &self, + booster_user_id: &UserId, + object_ap_id: &str, + ) -> Result<(), DomainError>; + + /// Fan out an Undo(Announce) to followers when a boost is removed. + async fn broadcast_undo_announce( + &self, + booster_user_id: &UserId, + object_ap_id: &str, + ) -> Result<(), DomainError>; + + /// Send a Like activity to a remote thought author's inbox. + /// Only called when a LOCAL user likes a REMOTE thought (one with an ap_id). + async fn broadcast_like( + &self, + liker_user_id: &UserId, + object_ap_id: &str, + author_inbox_url: &str, + ) -> Result<(), DomainError>; + + /// Send Undo(Like) to a remote thought author's inbox. + async fn broadcast_undo_like( + &self, + liker_user_id: &UserId, + object_ap_id: &str, + author_inbox_url: &str, + ) -> Result<(), DomainError>; + + /// Fan out an Update(Actor) to all accepted followers after a profile change. + async fn broadcast_actor_update(&self, user_id: &UserId) -> Result<(), DomainError>; +} diff --git a/crates/adapters/activitypub-base/src/lib.rs b/crates/adapters/activitypub-base/src/lib.rs index 515ebdb..aceb0d5 100644 --- a/crates/adapters/activitypub-base/src/lib.rs +++ b/crates/adapters/activitypub-base/src/lib.rs @@ -1,6 +1,7 @@ pub mod activities; pub mod actor_handler; pub mod actors; +pub mod ap_ports; pub mod content; pub mod data; pub mod error; @@ -17,6 +18,7 @@ pub mod user; pub mod webfinger; pub use activitypub_federation::kinds::object::NoteType; +pub use ap_ports::{ActorApUrls, ActivityPubRepository, OutboxEntry, OutboundFederationPort}; pub use content::ApObjectHandler; pub use data::FederationData; pub use error::Error; diff --git a/crates/adapters/activitypub-base/src/service.rs b/crates/adapters/activitypub-base/src/service.rs index 44d54a1..ff2f22f 100644 --- a/crates/adapters/activitypub-base/src/service.rs +++ b/crates/adapters/activitypub-base/src/service.rs @@ -50,26 +50,6 @@ fn content_to_html(text: &str) -> String { } } -fn extract_hashtag_tags(content: &str, base_url: &str) -> Vec { - let mut seen = std::collections::HashSet::new(); - let mut tags = Vec::new(); - for word in content.split_whitespace() { - let tag = word.trim_matches(|c: char| !c.is_alphanumeric() && c != '#'); - if let Some(name) = tag.strip_prefix('#') - && !name.is_empty() - && seen.insert(name.to_lowercase()) - { - let lower = name.to_lowercase(); - tags.push(serde_json::json!({ - "type": "Hashtag", - "name": format!("#{}", lower), - "href": format!("{}/tags/{}", base_url, lower), - })); - } - } - tags -} - fn thought_note_json( thought: &domain::models::thought::Thought, local_actor: &crate::actors::DbActor, @@ -114,9 +94,19 @@ fn thought_note_json( if let Some(updated_at) = thought.updated_at { note["updated"] = serde_json::json!(updated_at.to_rfc3339()); } - let hashtag_tags = extract_hashtag_tags(thought.content.as_str(), base_url); - if !hashtag_tags.is_empty() { - note["tag"] = serde_json::json!(hashtag_tags); + let hashtags = domain::hashtag::extract(thought.content.as_str()); + if !hashtags.is_empty() { + let ap_tags: Vec = hashtags + .iter() + .map(|h| { + serde_json::json!({ + "type": "Hashtag", + "name": h.ap_name, + "href": format!("{}/{}", base_url, h.url_slug), + }) + }) + .collect(); + note["tag"] = serde_json::json!(ap_tags); } Ok((ap_id, note)) } @@ -1405,7 +1395,7 @@ impl ActivityPubService { } #[async_trait::async_trait] -impl domain::ports::OutboundFederationPort for ActivityPubService { +impl crate::ap_ports::OutboundFederationPort for ActivityPubService { // Actor identity (ap_id, followers_url) comes from federation config via get_local_actor. // author_username is provided by the caller but not needed here. async fn broadcast_create( diff --git a/crates/adapters/activitypub/src/handler.rs b/crates/adapters/activitypub/src/handler.rs index be46735..faaa5d3 100644 --- a/crates/adapters/activitypub/src/handler.rs +++ b/crates/adapters/activitypub/src/handler.rs @@ -9,8 +9,8 @@ use url::Url; use crate::note::ThoughtNote; use crate::urls::ThoughtsUrls; -use activitypub_base::ApObjectHandler; -use domain::ports::{ActivityPubRepository, EventPublisher}; +use activitypub_base::{ActivityPubRepository, ApObjectHandler}; +use domain::ports::EventPublisher; use domain::value_objects::UserId; pub struct ThoughtsObjectHandler { diff --git a/crates/adapters/postgres/Cargo.toml b/crates/adapters/postgres/Cargo.toml index ae4ee47..568951a 100644 --- a/crates/adapters/postgres/Cargo.toml +++ b/crates/adapters/postgres/Cargo.toml @@ -4,9 +4,10 @@ version = "0.1.0" edition = "2021" [dependencies] -domain = { workspace = true } -event-payload = { workspace = true } -sqlx = { workspace = true } +domain = { workspace = true } +activitypub-base = { workspace = true } +event-payload = { workspace = true } +sqlx = { workspace = true } uuid = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } diff --git a/crates/adapters/postgres/src/activitypub.rs b/crates/adapters/postgres/src/activitypub.rs index 7b5dd4d..67050f4 100644 --- a/crates/adapters/postgres/src/activitypub.rs +++ b/crates/adapters/postgres/src/activitypub.rs @@ -6,10 +6,10 @@ const THOUGHTS_PATH_PREFIX: &str = "/thoughts/"; use chrono::{DateTime, Utc}; use sqlx::PgPool; +use activitypub_base::{ActivityPubRepository, ActorApUrls, OutboxEntry}; use domain::{ errors::DomainError, models::thought::{Thought, Visibility}, - ports::{ActivityPubRepository, ActorApUrls, OutboxEntry}, value_objects::{Content, ThoughtId, UserId, Username}, }; @@ -328,7 +328,7 @@ impl ActivityPubRepository for PgActivityPubRepository { #[cfg(test)] mod tests { use super::*; - use domain::ports::ActivityPubRepository; + use activitypub_base::ActivityPubRepository; #[sqlx::test(migrations = "./migrations")] async fn intern_remote_actor_is_idempotent(pool: sqlx::PgPool) { diff --git a/crates/adapters/postgres/src/feed.rs b/crates/adapters/postgres/src/feed.rs index 297985b..d335e84 100644 --- a/crates/adapters/postgres/src/feed.rs +++ b/crates/adapters/postgres/src/feed.rs @@ -5,11 +5,11 @@ use chrono::{DateTime, Utc}; use domain::{ errors::DomainError, models::{ - feed::{FeedEntry, PageParams, Paginated}, + feed::{FeedEntry, Paginated}, thought::{Thought, Visibility}, user::User, }, - ports::FeedRepository, + ports::{FeedQuery, FeedRepository, FeedScope}, value_objects::{Content, Email, PasswordHash, ThoughtId, UserId, Username}, }; use sqlx::PgPool; @@ -150,201 +150,178 @@ fn row_to_entry(r: FeedRow, viewer: Option) -> Result, - ) -> Result, DomainError> { - let ids: Vec = following_ids.iter().map(|id| id.as_uuid()).collect(); - let viewer = viewer_id.map(|v| v.as_uuid()); - let fed_clause = federation_following_clause(viewer); - let count_sql = format!( - "SELECT COUNT(*) FROM thoughts t WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct'", - fed_clause - ); - let total: i64 = sqlx::query_scalar(&count_sql) - .bind(&ids) - .fetch_one(&self.pool) - .await - .into_domain()?; + async fn query(&self, q: &FeedQuery) -> Result, DomainError> { + let viewer = q.viewer_id.as_ref().map(|v| v.as_uuid()); + let page = &q.page; - let sel = feed_select(viewer); - let sql = format!("{sel} WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct' ORDER BY t.created_at DESC LIMIT $2 OFFSET $3", fed_clause); - let rows = sqlx::query_as::<_, FeedRow>(&sql) - .bind(&ids) - .bind(page.limit()) - .bind(page.offset()) - .fetch_all(&self.pool) - .await - .into_domain()?; + match &q.scope { + FeedScope::Home { following_ids } => { + let ids: Vec = following_ids.iter().map(|id| id.as_uuid()).collect(); + let fed_clause = federation_following_clause(viewer); + let count_sql = format!( + "SELECT COUNT(*) FROM thoughts t WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct'", + fed_clause + ); + let total: i64 = sqlx::query_scalar(&count_sql) + .bind(&ids) + .fetch_one(&self.pool) + .await + .into_domain()?; - Ok(Paginated { - items: rows - .into_iter() - .map(|r| row_to_entry(r, viewer)) - .collect::, _>>()?, - total, - page: page.page, - per_page: page.per_page, - }) - } + let sel = feed_select(viewer); + let sql = format!("{sel} WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct' ORDER BY t.created_at DESC LIMIT $2 OFFSET $3", fed_clause); + let rows = sqlx::query_as::<_, FeedRow>(&sql) + .bind(&ids) + .bind(page.limit()) + .bind(page.offset()) + .fetch_all(&self.pool) + .await + .into_domain()?; - async fn public_feed( - &self, - page: &PageParams, - viewer_id: Option<&UserId>, - ) -> Result, DomainError> { - let viewer = viewer_id.map(|v| v.as_uuid()); - let total: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM thoughts t WHERE t.local=true AND t.visibility='public'", - ) - .fetch_one(&self.pool) - .await - .into_domain()?; + Ok(Paginated { + items: rows + .into_iter() + .map(|r| row_to_entry(r, viewer)) + .collect::, _>>()?, + total, + page: page.page, + per_page: page.per_page, + }) + } - let sel = feed_select(viewer); - let sql = format!("{sel} WHERE t.local=true AND t.visibility='public' ORDER BY t.created_at DESC LIMIT $1 OFFSET $2"); - let rows = sqlx::query_as::<_, FeedRow>(&sql) - .bind(page.limit()) - .bind(page.offset()) - .fetch_all(&self.pool) - .await - .into_domain()?; + FeedScope::Public => { + let total: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM thoughts t WHERE t.local=true AND t.visibility='public'", + ) + .fetch_one(&self.pool) + .await + .into_domain()?; - Ok(Paginated { - items: rows - .into_iter() - .map(|r| row_to_entry(r, viewer)) - .collect::, _>>()?, - total, - page: page.page, - per_page: page.per_page, - }) - } + let sel = feed_select(viewer); + let sql = format!("{sel} WHERE t.local=true AND t.visibility='public' ORDER BY t.created_at DESC LIMIT $1 OFFSET $2"); + let rows = sqlx::query_as::<_, FeedRow>(&sql) + .bind(page.limit()) + .bind(page.offset()) + .fetch_all(&self.pool) + .await + .into_domain()?; - async fn search( - &self, - query: &str, - page: &PageParams, - viewer_id: Option<&UserId>, - ) -> Result, DomainError> { - let viewer = viewer_id.map(|v| v.as_uuid()); - let total: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM thoughts t WHERE t.content % $1 AND t.visibility='public'", - ) - .bind(query) - .fetch_one(&self.pool) - .await - .into_domain()?; + Ok(Paginated { + items: rows + .into_iter() + .map(|r| row_to_entry(r, viewer)) + .collect::, _>>()?, + total, + page: page.page, + per_page: page.per_page, + }) + } - let sel = feed_select(viewer); - let sql = format!("{sel} WHERE t.content % $1 AND t.visibility='public' ORDER BY similarity(t.content, $1) DESC LIMIT $2 OFFSET $3"); - let rows = sqlx::query_as::<_, FeedRow>(&sql) - .bind(query) - .bind(page.limit()) - .bind(page.offset()) - .fetch_all(&self.pool) - .await - .into_domain()?; + FeedScope::Search { query } => { + let total: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM thoughts t WHERE t.content % $1 AND t.visibility='public'", + ) + .bind(query) + .fetch_one(&self.pool) + .await + .into_domain()?; - Ok(Paginated { - items: rows - .into_iter() - .map(|r| row_to_entry(r, viewer)) - .collect::, _>>()?, - total, - page: page.page, - per_page: page.per_page, - }) - } + let sel = feed_select(viewer); + let sql = format!("{sel} WHERE t.content % $1 AND t.visibility='public' ORDER BY similarity(t.content, $1) DESC LIMIT $2 OFFSET $3"); + let rows = sqlx::query_as::<_, FeedRow>(&sql) + .bind(query) + .bind(page.limit()) + .bind(page.offset()) + .fetch_all(&self.pool) + .await + .into_domain()?; - async fn tag_feed( - &self, - tag_name: &str, - page: &PageParams, - viewer_id: Option<&UserId>, - ) -> Result, DomainError> { - let viewer = viewer_id.map(|v| v.as_uuid()); - let total: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM thoughts t - JOIN thought_tags tt ON tt.thought_id = t.id - JOIN tags tg ON tg.id = tt.tag_id - WHERE tg.name = $1 AND t.visibility = 'public'", - ) - .bind(tag_name) - .fetch_one(&self.pool) - .await - .into_domain()?; + Ok(Paginated { + items: rows + .into_iter() + .map(|r| row_to_entry(r, viewer)) + .collect::, _>>()?, + total, + page: page.page, + per_page: page.per_page, + }) + } - let sel = feed_select(viewer); - let sql = format!( - "{sel} - JOIN thought_tags tt ON tt.thought_id = t.id - JOIN tags tg ON tg.id = tt.tag_id - WHERE tg.name = $1 AND t.visibility = 'public' - ORDER BY t.created_at DESC LIMIT $2 OFFSET $3" - ); - let rows = sqlx::query_as::<_, FeedRow>(&sql) - .bind(tag_name) - .bind(page.limit()) - .bind(page.offset()) - .fetch_all(&self.pool) - .await - .into_domain()?; + FeedScope::Tag { tag_name } => { + let total: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM thoughts t + JOIN thought_tags tt ON tt.thought_id = t.id + JOIN tags tg ON tg.id = tt.tag_id + WHERE tg.name = $1 AND t.visibility = 'public'", + ) + .bind(tag_name) + .fetch_one(&self.pool) + .await + .into_domain()?; - Ok(Paginated { - items: rows - .into_iter() - .map(|r| row_to_entry(r, viewer)) - .collect::, _>>()?, - total, - page: page.page, - per_page: page.per_page, - }) - } + let sel = feed_select(viewer); + let sql = format!( + "{sel} + JOIN thought_tags tt ON tt.thought_id = t.id + JOIN tags tg ON tg.id = tt.tag_id + WHERE tg.name = $1 AND t.visibility = 'public' + ORDER BY t.created_at DESC LIMIT $2 OFFSET $3" + ); + let rows = sqlx::query_as::<_, FeedRow>(&sql) + .bind(tag_name) + .bind(page.limit()) + .bind(page.offset()) + .fetch_all(&self.pool) + .await + .into_domain()?; - async fn user_feed( - &self, - user_id: &UserId, - page: &PageParams, - viewer_id: Option<&UserId>, - ) -> Result, DomainError> { - let viewer = viewer_id.map(|v| v.as_uuid()); - let uid = user_id.as_uuid(); + Ok(Paginated { + items: rows + .into_iter() + .map(|r| row_to_entry(r, viewer)) + .collect::, _>>()?, + total, + page: page.page, + per_page: page.per_page, + }) + } - // Use nil UUID for unauthenticated viewers — won't match owner or follower checks. - let viewer_uuid = viewer.unwrap_or(uuid::Uuid::nil()); + FeedScope::User { user_id } => { + let uid = user_id.as_uuid(); + // Use nil UUID for unauthenticated viewers — won't match owner or follower checks. + let viewer_uuid = viewer.unwrap_or(uuid::Uuid::nil()); - let total: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM thoughts t WHERE t.user_id = $1 AND ($2::uuid = $1 OR (t.visibility != 'direct' AND (t.visibility IN ('public', 'unlisted') OR (t.visibility = 'followers' AND EXISTS(SELECT 1 FROM follows WHERE follower_id = $2 AND following_id = $1 AND state = 'accepted')))))", - ) - .bind(uid) - .bind(viewer_uuid) - .fetch_one(&self.pool) - .await - .into_domain()?; + let total: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM thoughts t WHERE t.user_id = $1 AND ($2::uuid = $1 OR (t.visibility != 'direct' AND (t.visibility IN ('public', 'unlisted') OR (t.visibility = 'followers' AND EXISTS(SELECT 1 FROM follows WHERE follower_id = $2 AND following_id = $1 AND state = 'accepted')))))", + ) + .bind(uid) + .bind(viewer_uuid) + .fetch_one(&self.pool) + .await + .into_domain()?; - let sel = feed_select(viewer); - let sql = format!("{sel} WHERE t.user_id = $1 AND ($4::uuid = $1 OR (t.visibility != 'direct' AND (t.visibility IN ('public', 'unlisted') OR (t.visibility = 'followers' AND EXISTS(SELECT 1 FROM follows WHERE follower_id = $4 AND following_id = $1 AND state = 'accepted'))))) ORDER BY t.created_at DESC LIMIT $2 OFFSET $3"); - let rows = sqlx::query_as::<_, FeedRow>(&sql) - .bind(uid) - .bind(page.limit()) - .bind(page.offset()) - .bind(viewer_uuid) - .fetch_all(&self.pool) - .await - .into_domain()?; + let sel = feed_select(viewer); + let sql = format!("{sel} WHERE t.user_id = $1 AND ($4::uuid = $1 OR (t.visibility != 'direct' AND (t.visibility IN ('public', 'unlisted') OR (t.visibility = 'followers' AND EXISTS(SELECT 1 FROM follows WHERE follower_id = $4 AND following_id = $1 AND state = 'accepted'))))) ORDER BY t.created_at DESC LIMIT $2 OFFSET $3"); + let rows = sqlx::query_as::<_, FeedRow>(&sql) + .bind(uid) + .bind(page.limit()) + .bind(page.offset()) + .bind(viewer_uuid) + .fetch_all(&self.pool) + .await + .into_domain()?; - Ok(Paginated { - items: rows - .into_iter() - .map(|r| row_to_entry(r, viewer)) - .collect::, _>>()?, - total, - page: page.page, - per_page: page.per_page, - }) + Ok(Paginated { + items: rows + .into_iter() + .map(|r| row_to_entry(r, viewer)) + .collect::, _>>()?, + total, + page: page.page, + per_page: page.per_page, + }) + } + } } } @@ -354,10 +331,11 @@ mod tests { use crate::{thought::PgThoughtRepository, user::PgUserRepository}; use domain::{ models::{ + feed::PageParams, thought::{Thought, Visibility}, user::User, }, - ports::{ThoughtRepository, UserWriter}, + ports::{FeedQuery, ThoughtRepository, UserWriter}, value_objects::*, }; @@ -389,13 +367,10 @@ mod tests { let (_, _) = seed(&pool, "alice", "hello").await; let repo = PgFeedRepository::new(pool); let result = repo - .public_feed( - &PageParams { - page: 1, - per_page: 20, - }, + .query(&FeedQuery::public( + PageParams { page: 1, per_page: 20 }, None, - ) + )) .await .unwrap(); assert_eq!(result.total, 1); @@ -408,14 +383,11 @@ mod tests { let (_, _) = seed(&pool, "bob", "goodbye world").await; let repo = PgFeedRepository::new(pool); let result = repo - .search( + .query(&FeedQuery::search( "hello world", - &PageParams { - page: 1, - per_page: 20, - }, + PageParams { page: 1, per_page: 20 }, None, - ) + )) .await .unwrap(); assert!(result.total >= 1); diff --git a/crates/application/Cargo.toml b/crates/application/Cargo.toml index a7909fa..4669625 100644 --- a/crates/application/Cargo.toml +++ b/crates/application/Cargo.toml @@ -4,8 +4,9 @@ version = "0.1.0" edition = "2021" [dependencies] -domain = { workspace = true } -async-trait = { workspace = true } +domain = { workspace = true } +activitypub-base = { workspace = true } +async-trait = { workspace = true } thiserror = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index 05db0a8..9385df2 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -1,2 +1,5 @@ pub mod services; pub mod use_cases; + +#[cfg(test)] +pub mod testing; diff --git a/crates/application/src/services/federation_event.rs b/crates/application/src/services/federation_event.rs index 3424068..4c5ecb7 100644 --- a/crates/application/src/services/federation_event.rs +++ b/crates/application/src/services/federation_event.rs @@ -1,8 +1,9 @@ +use activitypub_base::{ActivityPubRepository, OutboundFederationPort}; use domain::{ errors::DomainError, events::DomainEvent, models::thought::Visibility, - ports::{ActivityPubRepository, OutboundFederationPort, ThoughtRepository, UserReader}, + ports::{ThoughtRepository, UserReader}, value_objects::ThoughtId, }; use std::sync::Arc; @@ -212,13 +213,14 @@ impl FederationEventService { #[cfg(test)] mod tests { use super::*; + use activitypub_base::{ActorApUrls, OutboundFederationPort}; use async_trait::async_trait; + use crate::testing::TestApRepo; use domain::{ errors::DomainError, events::DomainEvent, models::thought::{Thought, Visibility}, models::user::User, - ports::{ActivityPubRepository, OutboundFederationPort}, testing::TestStore, value_objects::*, }; @@ -325,12 +327,23 @@ mod tests { } fn svc(store: &TestStore, spy: Arc) -> FederationEventService { + let ap_repo = TestApRepo::new(store.clone()); FederationEventService { thoughts: Arc::new(store.clone()), users: Arc::new(store.clone()), ap: spy, base_url: "https://example.com".to_string(), - ap_repo: Arc::new(store.clone()), + ap_repo: Arc::new(ap_repo), + } + } + + fn svc_with_ap(store: &TestStore, ap_repo: TestApRepo, spy: Arc) -> FederationEventService { + FederationEventService { + thoughts: Arc::new(store.clone()), + users: Arc::new(store.clone()), + ap: spy, + base_url: "https://example.com".to_string(), + ap_repo: Arc::new(ap_repo), } } @@ -452,7 +465,8 @@ mod tests { let alice = alice(); let mut thought = local_thought(alice.id.clone()); thought.local = false; - store.thought_ap_ids.lock().unwrap().insert( + let ap_repo = TestApRepo::new(store.clone()); + ap_repo.inner.thought_ap_ids.lock().unwrap().insert( thought.id.clone(), "https://mastodon.social/users/bob/statuses/123".into(), ); @@ -460,7 +474,7 @@ mod tests { store.thoughts.lock().unwrap().push(thought.clone()); let spy = Arc::new(SpyPort::default()); - svc(&store, spy.clone()) + svc_with_ap(&store, ap_repo, spy.clone()) .process(&DomainEvent::BoostAdded { boost_id: BoostId::new(), user_id: alice.id.clone(), @@ -604,14 +618,15 @@ mod tests { let alice = alice(); let mut thought = local_thought(alice.id.clone()); thought.local = false; - store.thought_ap_ids.lock().unwrap().insert( + let ap_repo = TestApRepo::new(store.clone()); + ap_repo.inner.thought_ap_ids.lock().unwrap().insert( thought.id.clone(), "https://mastodon.social/users/bob/statuses/456".into(), ); store.thoughts.lock().unwrap().push(thought.clone()); let spy = Arc::new(SpyPort::default()); - svc(&store, spy.clone()) + svc_with_ap(&store, ap_repo, spy.clone()) .process(&DomainEvent::BoostRemoved { user_id: alice.id.clone(), thought_id: thought.id.clone(), @@ -673,28 +688,28 @@ mod tests { PasswordHash("h".into()), ); author.local = false; - store.actor_ap_urls.lock().unwrap().insert( - author.id.clone(), - domain::ports::ActorApUrls { - ap_id: "https://mastodon.social/users/author".into(), - inbox_url: "https://mastodon.social/users/author/inbox".into(), - }, - ); - let thought = local_thought(author.id.clone()); - store.thought_ap_ids.lock().unwrap().insert( - thought.id.clone(), - "https://mastodon.social/posts/123".into(), - ); - let liker = alice(); store.users.lock().unwrap().push(author.clone()); store.users.lock().unwrap().push(liker.clone()); store.thoughts.lock().unwrap().push(thought.clone()); + let ap_repo = TestApRepo::new(store.clone()); + ap_repo.actor_ap_urls.lock().unwrap().insert( + author.id.clone(), + ActorApUrls { + ap_id: "https://mastodon.social/users/author".into(), + inbox_url: "https://mastodon.social/users/author/inbox".into(), + }, + ); + ap_repo.inner.thought_ap_ids.lock().unwrap().insert( + thought.id.clone(), + "https://mastodon.social/posts/123".into(), + ); + let spy = Arc::new(SpyPort::default()); - svc(&store, spy.clone()) + svc_with_ap(&store, ap_repo, spy.clone()) .process(&DomainEvent::LikeAdded { like_id: LikeId::new(), user_id: liker.id, diff --git a/crates/application/src/testing.rs b/crates/application/src/testing.rs new file mode 100644 index 0000000..689289e --- /dev/null +++ b/crates/application/src/testing.rs @@ -0,0 +1,150 @@ +/// Test helpers for application-layer tests that need activitypub_base traits. +use activitypub_base::{ActivityPubRepository, ActorApUrls, OutboxEntry}; +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::user::User, + testing::TestStore, + value_objects::{Email, PasswordHash, ThoughtId, UserId, Username}, +}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +/// Extends `TestStore` with AP-specific lookup maps. +#[derive(Default, Clone)] +pub struct TestApRepo { + pub inner: TestStore, + /// UserId → ActorApUrls (for get_actor_ap_urls) + pub actor_ap_urls: Arc>>, +} + +impl TestApRepo { + pub fn new(inner: TestStore) -> Self { + Self { + inner, + actor_ap_urls: Default::default(), + } + } +} + +#[async_trait] +impl ActivityPubRepository for TestApRepo { + async fn outbox_entries_for_actor( + &self, + _uid: &UserId, + ) -> Result, DomainError> { + Ok(vec![]) + } + async fn outbox_page_for_actor( + &self, + _uid: &UserId, + _before: Option>, + _limit: usize, + ) -> Result, DomainError> { + Ok(vec![]) + } + async fn find_remote_actor_id( + &self, + actor_ap_url: &str, + ) -> Result, DomainError> { + Ok(self + .inner + .actor_ap_ids + .lock() + .unwrap() + .get(actor_ap_url) + .cloned()) + } + async fn intern_remote_actor(&self, actor_ap_url: &str) -> Result { + if let Some(uid) = self.find_remote_actor_id(actor_ap_url).await? { + return Ok(uid); + } + let uid = UserId::new(); + let handle = url::Url::parse(actor_ap_url) + .map(|u| u.path().trim_start_matches('/').replace('/', "_")) + .unwrap_or_else(|_| format!("remote_{}", &uid.to_string()[..8])); + let user = User { + id: uid.clone(), + username: Username::from_trusted(handle), + email: Email::from_trusted(format!("{}@remote", uid)), + password_hash: PasswordHash("".into()), + display_name: None, + bio: None, + avatar_url: None, + header_url: None, + custom_css: None, + local: false, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + self.inner.users.lock().unwrap().push(user); + self.inner + .actor_ap_ids + .lock() + .unwrap() + .insert(actor_ap_url.to_string(), uid.clone()); + Ok(uid) + } + async fn update_remote_actor_display( + &self, + _user_id: &UserId, + _display_name: Option<&str>, + _avatar_url: Option<&str>, + ) -> Result<(), DomainError> { + Ok(()) + } + async fn accept_note( + &self, + _ap_id: &str, + _author_id: &UserId, + _content: &str, + _published: chrono::DateTime, + _sensitive: bool, + _content_warning: Option, + _visibility: &str, + _in_reply_to: Option<&str>, + ) -> Result<(), DomainError> { + Ok(()) + } + async fn apply_note_update( + &self, + _ap_id: &str, + _new_content: &str, + ) -> Result<(), DomainError> { + Ok(()) + } + async fn retract_note(&self, _ap_id: &str) -> Result<(), DomainError> { + Ok(()) + } + async fn retract_actor_notes(&self, _actor_ap_url: &str) -> Result<(), DomainError> { + Ok(()) + } + async fn count_local_notes(&self) -> Result { + Ok(self + .inner + .thoughts + .lock() + .unwrap() + .iter() + .filter(|t| t.local) + .count() as u64) + } + async fn get_thought_ap_id( + &self, + thought_id: &ThoughtId, + ) -> Result, DomainError> { + Ok(self + .inner + .thought_ap_ids + .lock() + .unwrap() + .get(thought_id) + .cloned()) + } + async fn get_actor_ap_urls( + &self, + user_id: &UserId, + ) -> Result, DomainError> { + Ok(self.actor_ap_urls.lock().unwrap().get(user_id).cloned()) + } +} diff --git a/crates/application/src/use_cases/auth.rs b/crates/application/src/use_cases/auth.rs index c375daf..b717733 100644 --- a/crates/application/src/use_cases/auth.rs +++ b/crates/application/src/use_cases/auth.rs @@ -38,13 +38,11 @@ pub async fn register( .save(&user) .await .map_err(|e| match e { - DomainError::Conflict(ref c) if c.contains("username") => { - DomainError::Conflict("username taken".into()) - } - DomainError::Conflict(ref c) if c.contains("email") => { - DomainError::Conflict("email taken".into()) - } - DomainError::Conflict(_) => DomainError::Conflict("already exists".into()), + DomainError::Conflict(c) => match c.as_str() { + "users_username_key" => DomainError::Conflict("username taken".into()), + "users_email_key" => DomainError::Conflict("email taken".into()), + _ => DomainError::Conflict("already exists".into()), + }, other => other, })?; events @@ -111,6 +109,7 @@ mod tests { /// Simulates a concurrent registration that slips past the pre-checks and /// hits the DB unique constraint — exactly what happens in the TOCTOU window. struct ConflictOnSaveStore(TestStore); + struct EmailConflictOnSaveStore(TestStore); #[async_trait] impl UserReader for ConflictOnSaveStore { @@ -154,6 +153,48 @@ mod tests { } } + #[async_trait] + impl UserReader for EmailConflictOnSaveStore { + async fn find_by_id(&self, id: &UserId) -> Result, DomainError> { + self.0.find_by_id(id).await + } + async fn find_by_username( + &self, + username: &Username, + ) -> Result, DomainError> { + self.0.find_by_username(username).await + } + async fn find_by_email(&self, email: &Email) -> Result, DomainError> { + self.0.find_by_email(email).await + } + async fn list_with_stats(&self) -> Result, DomainError> { + self.0.list_with_stats().await + } + async fn count(&self) -> Result { + self.0.count().await + } + } + + #[async_trait] + impl UserWriter for EmailConflictOnSaveStore { + async fn save(&self, _user: &User) -> Result<(), DomainError> { + Err(DomainError::Conflict("users_email_key".into())) + } + async fn update_profile( + &self, + user_id: &UserId, + display_name: Option, + bio: Option, + avatar_url: Option, + header_url: Option, + custom_css: Option, + ) -> Result<(), DomainError> { + self.0 + .update_profile(user_id, display_name, bio, avatar_url, header_url, custom_css) + .await + } + } + struct FakeHasher; #[async_trait] impl PasswordHasher for FakeHasher { @@ -315,4 +356,17 @@ mod tests { err ); } + + #[tokio::test] + async fn register_maps_db_conflict_on_email_to_conflict() { + let store = EmailConflictOnSaveStore(TestStore::default()); + let err = register(&store, &FakeHasher, &FakeAuth, &NoOpEventPublisher, input()) + .await + .unwrap_err(); + assert!( + matches!(err, DomainError::Conflict(ref m) if m == "email taken"), + "expected 'email taken', got: {:?}", + err + ); + } } diff --git a/crates/application/src/use_cases/federation_management.rs b/crates/application/src/use_cases/federation_management.rs index 8ddeb3b..f7eed3d 100644 --- a/crates/application/src/use_cases/federation_management.rs +++ b/crates/application/src/use_cases/federation_management.rs @@ -1,3 +1,4 @@ +use activitypub_base::ActivityPubRepository; use domain::{ errors::DomainError, models::{ @@ -6,9 +7,9 @@ use domain::{ remote_actor::RemoteActor, }, ports::{ - ActivityPubRepository, EventPublisher, FederationActionPort, FederationFollowPort, - FederationFollowRequestPort, FederationSchedulerPort, FeedRepository, FollowRepository, - RemoteActorConnectionRepository, UserReader, + EventPublisher, FederationActionPort, FederationFollowPort, + FederationFollowRequestPort, FederationSchedulerPort, FeedQuery, FeedRepository, + FollowRepository, RemoteActorConnectionRepository, UserReader, }, value_objects::UserId, }; @@ -85,7 +86,7 @@ pub async fn get_remote_actor_posts( Some(id) => id, None => ap_repo.intern_remote_actor(&actor.url).await?, }; - let result = feed.user_feed(&author_id, &page, viewer_id).await?; + let result = feed.query(&FeedQuery::user(author_id, page.clone(), viewer_id.cloned())).await?; if let Some(outbox_url) = actor.outbox_url { let _ = scheduler .schedule_actor_posts_fetch(&actor.url, &outbox_url) diff --git a/crates/application/src/use_cases/feed.rs b/crates/application/src/use_cases/feed.rs index 3275a1a..13af651 100644 --- a/crates/application/src/use_cases/feed.rs +++ b/crates/application/src/use_cases/feed.rs @@ -4,7 +4,7 @@ use domain::{ feed::{FeedEntry, PageParams, Paginated, UserSummary}, user::User, }, - ports::{FeedRepository, FollowRepository, TagRepository, UserReader}, + ports::{FeedQuery, FeedRepository, FollowRepository, TagRepository, UserReader}, value_objects::UserId, }; @@ -16,7 +16,7 @@ pub async fn get_home_feed( ) -> Result, DomainError> { let mut following_ids = follows.get_accepted_following_ids(user_id).await?; following_ids.push(user_id.clone()); // include own thoughts in home feed - feed.home_feed(&following_ids, &page, Some(user_id)).await + feed.query(&FeedQuery::home(user_id.clone(), following_ids, page)).await } pub async fn get_public_feed( @@ -24,7 +24,7 @@ pub async fn get_public_feed( viewer_id: Option<&UserId>, page: PageParams, ) -> Result, DomainError> { - feed.public_feed(&page, viewer_id).await + feed.query(&FeedQuery::public(page, viewer_id.cloned())).await } pub async fn get_user_feed( @@ -33,7 +33,7 @@ pub async fn get_user_feed( page: PageParams, viewer_id: Option<&UserId>, ) -> Result, DomainError> { - feed.user_feed(user_id, &page, viewer_id).await + feed.query(&FeedQuery::user(user_id.clone(), page, viewer_id.cloned())).await } pub async fn get_followers( @@ -58,7 +58,7 @@ pub async fn get_by_tag( page: PageParams, viewer_id: Option<&UserId>, ) -> Result, DomainError> { - feed.tag_feed(tag_name, &page, viewer_id).await + feed.query(&FeedQuery::tag(tag_name, page, viewer_id.cloned())).await } pub async fn search( @@ -67,7 +67,7 @@ pub async fn search( page: PageParams, viewer_id: Option<&UserId>, ) -> Result, DomainError> { - feed.search(query, &page, viewer_id).await + feed.query(&FeedQuery::search(query, page, viewer_id.cloned())).await } pub async fn list_users(users: &dyn UserReader) -> Result, DomainError> { diff --git a/crates/application/src/use_cases/thoughts.rs b/crates/application/src/use_cases/thoughts.rs index e72a695..abb8b98 100644 --- a/crates/application/src/use_cases/thoughts.rs +++ b/crates/application/src/use_cases/thoughts.rs @@ -6,30 +6,6 @@ use domain::{ value_objects::{Content, ThoughtId, UserId}, }; -fn extract_hashtags(content: &str) -> Vec { - let mut tags = Vec::new(); - let mut chars = content.char_indices().peekable(); - while let Some((_, c)) = chars.next() { - if c == '#' - && chars - .peek() - .map(|(_, nc)| nc.is_alphanumeric()) - .unwrap_or(false) - { - let tag: String = chars - .by_ref() - .take_while(|(_, nc)| nc.is_alphanumeric() || *nc == '_') - .map(|(_, nc)| nc) - .collect(); - if !tag.is_empty() { - tags.push(tag.to_lowercase()); - } - } - } - tags.dedup(); - tags -} - fn require_owner(thought: &Thought, user_id: &UserId) -> Result<(), DomainError> { if thought.user_id != *user_id { return Err(DomainError::NotFound); @@ -76,8 +52,8 @@ pub async fn create_thought( thoughts.save(&thought).await?; // Extract and attach hashtags from content. - for tag_name in extract_hashtags(content.as_str()) { - if let Ok(tag) = tags.find_or_create(&tag_name).await { + for h in domain::hashtag::extract(content.as_str()) { + if let Ok(tag) = tags.find_or_create(&h.normalized).await { let _ = tags.attach_to_thought(&thought.id, tag.id).await; } } @@ -195,6 +171,33 @@ mod tests { assert!(matches!(staged[0], DomainEvent::ThoughtCreated { .. })); } + #[tokio::test] + async fn delete_thought_stages_outbox_event() { + let store = TestStore::default(); + let outbox = TestOutbox::default(); + let u = user(); + store.users.lock().unwrap().push(u.clone()); + let out = create_thought( + &store, + &store, + &store, + &NoOpEventPublisher, + &NoOpOutboxWriter, + input(u.id.clone()), + ) + .await + .unwrap(); + let tid = out.thought.id.clone(); + + delete_thought(&store, &NoOpEventPublisher, &outbox, &tid, &u.id) + .await + .unwrap(); + + let staged = outbox.staged(); + assert_eq!(staged.len(), 1); + assert!(matches!(&staged[0], DomainEvent::ThoughtDeleted { thought_id, .. } if *thought_id == tid)); + } + #[tokio::test] async fn delete_own_thought_succeeds() { let store = TestStore::default(); diff --git a/crates/domain/src/hashtag.rs b/crates/domain/src/hashtag.rs new file mode 100644 index 0000000..7988a36 --- /dev/null +++ b/crates/domain/src/hashtag.rs @@ -0,0 +1,139 @@ +use std::collections::HashSet; + +/// A hashtag extracted from content. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Hashtag { + /// Original casing, e.g. "Rust" + pub raw: String, + /// Lowercased, e.g. "rust" — used for DB lookups + pub normalized: String, + /// "tags/rust" — callers prepend base_url + pub url_slug: String, + /// "#rust" — used directly in AP tag array + pub ap_name: String, +} + +/// Extract hashtags from content using a char-by-char scan. +/// +/// Rules: +/// - Tag starts after a bare `#` followed immediately by an alphanumeric char. +/// - Tag chars: `[A-Za-z0-9_]`. +/// - Deduplicated case-insensitively; first occurrence wins. +/// - Returned in order of first appearance. +pub fn extract(content: &str) -> Vec { + let mut seen: HashSet = HashSet::new(); + let mut tags: Vec = Vec::new(); + let mut chars = content.char_indices().peekable(); + + while let Some((_, c)) = chars.next() { + if c == '#' + && chars + .peek() + .map(|(_, nc)| nc.is_alphanumeric()) + .unwrap_or(false) + { + let raw: String = chars + .by_ref() + .take_while(|(_, nc)| nc.is_alphanumeric() || *nc == '_') + .map(|(_, nc)| nc) + .collect(); + + if raw.is_empty() { + continue; + } + + let normalized = raw.to_lowercase(); + if seen.insert(normalized.clone()) { + tags.push(Hashtag { + url_slug: format!("tags/{}", normalized), + ap_name: format!("#{}", normalized), + raw, + normalized, + }); + } + } + } + + tags +} + +#[cfg(test)] +mod tests { + use super::*; + + fn names(tags: &[Hashtag]) -> Vec<&str> { + tags.iter().map(|h| h.normalized.as_str()).collect() + } + + #[test] + fn basic() { + let tags = extract("Hello #world and #Rust!"); + assert_eq!(names(&tags), ["world", "rust"]); + } + + #[test] + fn fields() { + let tags = extract("#Rust"); + assert_eq!(tags.len(), 1); + let h = &tags[0]; + assert_eq!(h.raw, "Rust"); + assert_eq!(h.normalized, "rust"); + assert_eq!(h.url_slug, "tags/rust"); + assert_eq!(h.ap_name, "#rust"); + } + + #[test] + fn dedup_case_insensitive() { + let tags = extract("#rust #Rust #RUST"); + assert_eq!(names(&tags), ["rust"]); + assert_eq!(tags[0].raw, "rust"); // first occurrence wins + } + + #[test] + fn deduplicates_non_adjacent() { + // The old algorithm used Vec::dedup() which only removes adjacent duplicates. + // Using HashSet silently fixed this bug. This test documents the fix. + let tags = extract("#a #b #a"); + assert_eq!(tags.len(), 2); + assert_eq!(tags[0].normalized, "a"); + assert_eq!(tags[1].normalized, "b"); + } + + #[test] + fn mid_word_extracted() { + // `text#tag` — `#` not preceded by whitespace is still matched by the + // char-by-char scan (the old algorithm didn't require whitespace before `#`). + // This test documents the authoritative behaviour: mid-word tags ARE extracted. + let tags = extract("text#tag"); + assert_eq!(names(&tags), ["tag"]); + } + + #[test] + fn hash_only_ignored() { + assert!(extract("# lone hash").is_empty()); + } + + #[test] + fn trailing_punctuation_excluded() { + // punctuation after tag terminates the tag, not included + let tags = extract("#rust."); + assert_eq!(names(&tags), ["rust"]); + } + + #[test] + fn underscore_allowed() { + let tags = extract("#hello_world"); + assert_eq!(names(&tags), ["hello_world"]); + } + + #[test] + fn empty_content() { + assert!(extract("").is_empty()); + } + + #[test] + fn order_of_appearance() { + let tags = extract("#b #a #c"); + assert_eq!(names(&tags), ["b", "a", "c"]); + } +} diff --git a/crates/domain/src/lib.rs b/crates/domain/src/lib.rs index af60305..27901fe 100644 --- a/crates/domain/src/lib.rs +++ b/crates/domain/src/lib.rs @@ -1,5 +1,6 @@ pub mod errors; pub mod events; +pub mod hashtag; pub mod models; pub mod ports; pub mod value_objects; diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index d5bac18..bfd0726 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -317,37 +317,43 @@ impl< { } +#[derive(Debug, Clone)] +pub enum FeedScope { + Home { following_ids: Vec }, + Public, + Tag { tag_name: String }, + User { user_id: UserId }, + Search { query: String }, +} + +#[derive(Debug, Clone)] +pub struct FeedQuery { + pub scope: FeedScope, + pub page: PageParams, + pub viewer_id: Option, +} + +impl FeedQuery { + pub fn home(viewer_id: UserId, following_ids: Vec, page: PageParams) -> Self { + Self { scope: FeedScope::Home { following_ids }, page, viewer_id: Some(viewer_id) } + } + pub fn public(page: PageParams, viewer_id: Option) -> Self { + Self { scope: FeedScope::Public, page, viewer_id } + } + pub fn tag(tag_name: impl Into, page: PageParams, viewer_id: Option) -> Self { + Self { scope: FeedScope::Tag { tag_name: tag_name.into() }, page, viewer_id } + } + pub fn user(user_id: UserId, page: PageParams, viewer_id: Option) -> Self { + Self { scope: FeedScope::User { user_id }, page, viewer_id } + } + pub fn search(query: impl Into, page: PageParams, viewer_id: Option) -> Self { + Self { scope: FeedScope::Search { query: query.into() }, page, viewer_id } + } +} + #[async_trait] pub trait FeedRepository: Send + Sync { - async fn home_feed( - &self, - following_ids: &[UserId], - page: &PageParams, - viewer_id: Option<&UserId>, - ) -> Result, DomainError>; - async fn public_feed( - &self, - page: &PageParams, - viewer_id: Option<&UserId>, - ) -> Result, DomainError>; - async fn search( - &self, - query: &str, - page: &PageParams, - viewer_id: Option<&UserId>, - ) -> Result, DomainError>; - async fn tag_feed( - &self, - tag_name: &str, - page: &PageParams, - viewer_id: Option<&UserId>, - ) -> Result, DomainError>; - async fn user_feed( - &self, - user_id: &UserId, - page: &PageParams, - viewer_id: Option<&UserId>, - ) -> Result, DomainError>; + async fn query(&self, q: &FeedQuery) -> Result, DomainError>; } #[async_trait] @@ -368,166 +374,6 @@ pub trait SearchPort: Send + Sync { ) -> Result, DomainError>; } -/// AP-protocol endpoints for a locally-stored user (local or interned remote). -#[derive(Debug, Clone)] -pub struct ActorApUrls { - pub ap_id: String, - pub inbox_url: String, -} - -/// A local thought ready for AP serialization, with the author's username -/// pre-joined so the handler can build AP URLs without a second query. -#[derive(Debug, Clone)] -pub struct OutboxEntry { - pub thought: crate::models::thought::Thought, - pub author_username: Username, -} - -#[async_trait] -pub trait ActivityPubRepository: Send + Sync { - // ── Outbox (local → remote) ────────────────────────────────────── - - /// All public local thoughts for this actor. Used for outbox totals - /// and full-collection delivery. - async fn outbox_entries_for_actor( - &self, - user_id: &UserId, - ) -> Result, DomainError>; - - /// Cursor page of public local thoughts, newest-first, before `before`. - /// Used for OrderedCollectionPage responses. - async fn outbox_page_for_actor( - &self, - user_id: &UserId, - before: Option>, - limit: usize, - ) -> Result, DomainError>; - - // ── Remote actor resolution ────────────────────────────────────── - - /// Find the local UserId for a remote actor by its AP URL. - async fn find_remote_actor_id(&self, actor_ap_url: &str) - -> Result, DomainError>; - - /// Ensure a remote actor placeholder exists; create one if absent. - /// Idempotent — safe to call multiple times with the same URL. - async fn intern_remote_actor(&self, actor_ap_url: &str) -> Result; - - /// Update display_name and avatar_url for an already-interned remote actor. - async fn update_remote_actor_display( - &self, - user_id: &UserId, - display_name: Option<&str>, - avatar_url: Option<&str>, - ) -> Result<(), DomainError>; - - // ── Inbox processing (remote → local) ─────────────────────────── - - /// Persist an incoming remote Note. Idempotent on ap_id. - #[allow(clippy::too_many_arguments)] - async fn accept_note( - &self, - ap_id: &str, - author_id: &UserId, - content: &str, - published: chrono::DateTime, - sensitive: bool, - content_warning: Option, - visibility: &str, - in_reply_to: Option<&str>, - ) -> Result<(), DomainError>; - - /// Apply an Update to a previously accepted remote Note. - async fn apply_note_update(&self, ap_id: &str, new_content: &str) -> Result<(), DomainError>; - - /// Remove a specific remote Note (Delete activity). Only touches - /// remotely-originated thoughts. - async fn retract_note(&self, ap_id: &str) -> Result<(), DomainError>; - - /// Remove all Notes from a remote actor (actor-level Delete/Tombstone). - async fn retract_actor_notes(&self, actor_ap_url: &str) -> Result<(), DomainError>; - - // ── Node-level stats ───────────────────────────────────────────── - - /// Total locally-authored thought count for NodeInfo responses. - async fn count_local_notes(&self) -> Result; - - /// Return the ActivityPub object URL for a thought, if one is stored. - /// Returns None for local thoughts (caller constructs URL from base_url + thought_id). - async fn get_thought_ap_id( - &self, - thought_id: &ThoughtId, - ) -> Result, DomainError>; - - /// Return the AP actor URL and inbox URL for a user, if stored. - /// Returns None for users that have not been federated. - async fn get_actor_ap_urls(&self, user_id: &UserId) - -> Result, DomainError>; -} - -#[async_trait] -pub trait OutboundFederationPort: Send + Sync { - /// Fan out a new local Note to all accepted followers. - async fn broadcast_create( - &self, - author_user_id: &UserId, - thought: &Thought, - author_username: &str, - in_reply_to_url: Option<&str>, - ) -> Result<(), DomainError>; - - /// Fan out a Delete tombstone for a now-deleted local Note. - /// `thought_ap_id` is pre-constructed by the caller because the thought - /// has already been deleted from the DB when this fires. - async fn broadcast_delete( - &self, - author_user_id: &UserId, - thought_ap_id: &str, - ) -> Result<(), DomainError>; - - /// Fan out an Update(Note) for an edited local thought. - async fn broadcast_update( - &self, - author_user_id: &UserId, - thought: &Thought, - author_username: &str, - in_reply_to_url: Option<&str>, - ) -> Result<(), DomainError>; - - /// Fan out an Announce(object_ap_id) for a boost. - async fn broadcast_announce( - &self, - booster_user_id: &UserId, - object_ap_id: &str, - ) -> Result<(), DomainError>; - - /// Fan out an Undo(Announce) to followers when a boost is removed. - async fn broadcast_undo_announce( - &self, - booster_user_id: &UserId, - object_ap_id: &str, - ) -> Result<(), DomainError>; - - /// Send a Like activity to a remote thought author's inbox. - /// Only called when a LOCAL user likes a REMOTE thought (one with an ap_id). - async fn broadcast_like( - &self, - liker_user_id: &UserId, - object_ap_id: &str, - author_inbox_url: &str, - ) -> Result<(), DomainError>; - - /// Send Undo(Like) to a remote thought author's inbox. - async fn broadcast_undo_like( - &self, - liker_user_id: &UserId, - object_ap_id: &str, - author_inbox_url: &str, - ) -> Result<(), DomainError>; - - /// Fan out an Update(Actor) to all accepted followers after a profile change. - async fn broadcast_actor_update(&self, user_id: &UserId) -> Result<(), DomainError>; -} #[async_trait] pub trait FederationSchedulerPort: Send + Sync { diff --git a/crates/domain/src/testing.rs b/crates/domain/src/testing.rs index ff2c0f8..763b2d9 100644 --- a/crates/domain/src/testing.rs +++ b/crates/domain/src/testing.rs @@ -39,8 +39,6 @@ pub struct TestStore { pub actor_ap_ids: Arc>>, /// ThoughtId → AP object URL (used by get_thought_ap_id) pub thought_ap_ids: Arc>>, - /// UserId → ActorApUrls (used by get_actor_ap_urls) - pub actor_ap_urls: Arc>>, } #[async_trait] @@ -706,63 +704,7 @@ impl RemoteActorConnectionRepository for TestStore { #[async_trait] impl FeedRepository for TestStore { - async fn home_feed( - &self, - _ids: &[UserId], - _p: &PageParams, - _v: Option<&UserId>, - ) -> Result, DomainError> { - Ok(Paginated { - items: vec![], - total: 0, - page: 1, - per_page: 20, - }) - } - async fn public_feed( - &self, - _p: &PageParams, - _v: Option<&UserId>, - ) -> Result, DomainError> { - Ok(Paginated { - items: vec![], - total: 0, - page: 1, - per_page: 20, - }) - } - async fn search( - &self, - _q: &str, - _p: &PageParams, - _v: Option<&UserId>, - ) -> Result, DomainError> { - Ok(Paginated { - items: vec![], - total: 0, - page: 1, - per_page: 20, - }) - } - async fn tag_feed( - &self, - _tag_name: &str, - _page: &PageParams, - _viewer_id: Option<&UserId>, - ) -> Result, DomainError> { - Ok(Paginated { - items: vec![], - total: 0, - page: 1, - per_page: 20, - }) - } - async fn user_feed( - &self, - _user_id: &UserId, - _page: &PageParams, - _viewer_id: Option<&UserId>, - ) -> Result, DomainError> { + async fn query(&self, _q: &crate::ports::FeedQuery) -> Result, DomainError> { Ok(Paginated { items: vec![], total: 0, @@ -801,109 +743,6 @@ impl SearchPort for TestStore { } } -#[async_trait] -impl ActivityPubRepository for TestStore { - async fn outbox_entries_for_actor( - &self, - _uid: &UserId, - ) -> Result, DomainError> { - Ok(vec![]) - } - async fn outbox_page_for_actor( - &self, - _uid: &UserId, - _before: Option>, - _limit: usize, - ) -> Result, DomainError> { - Ok(vec![]) - } - async fn find_remote_actor_id( - &self, - actor_ap_url: &str, - ) -> Result, DomainError> { - Ok(self.actor_ap_ids.lock().unwrap().get(actor_ap_url).cloned()) - } - async fn intern_remote_actor(&self, actor_ap_url: &str) -> Result { - if let Some(uid) = self.find_remote_actor_id(actor_ap_url).await? { - return Ok(uid); - } - let uid = UserId::new(); - let handle = url::Url::parse(actor_ap_url) - .map(|u| u.path().trim_start_matches('/').replace('/', "_")) - .unwrap_or_else(|_| format!("remote_{}", &uid.to_string()[..8])); - let user = crate::models::user::User { - id: uid.clone(), - username: Username::from_trusted(handle.clone()), - email: Email::from_trusted(format!("{}@remote", uid)), - password_hash: PasswordHash("".into()), - display_name: None, - bio: None, - avatar_url: None, - header_url: None, - custom_css: None, - local: false, - created_at: chrono::Utc::now(), - updated_at: chrono::Utc::now(), - }; - self.users.lock().unwrap().push(user); - self.actor_ap_ids - .lock() - .unwrap() - .insert(actor_ap_url.to_string(), uid.clone()); - Ok(uid) - } - async fn update_remote_actor_display( - &self, - _user_id: &UserId, - _display_name: Option<&str>, - _avatar_url: Option<&str>, - ) -> Result<(), DomainError> { - Ok(()) - } - async fn accept_note( - &self, - _ap_id: &str, - _author_id: &UserId, - _content: &str, - _published: chrono::DateTime, - _sensitive: bool, - _content_warning: Option, - _visibility: &str, - _in_reply_to: Option<&str>, - ) -> Result<(), DomainError> { - Ok(()) - } - async fn apply_note_update(&self, _ap_id: &str, _new_content: &str) -> Result<(), DomainError> { - Ok(()) - } - async fn retract_note(&self, _ap_id: &str) -> Result<(), DomainError> { - Ok(()) - } - async fn retract_actor_notes(&self, _actor_ap_url: &str) -> Result<(), DomainError> { - Ok(()) - } - async fn count_local_notes(&self) -> Result { - Ok(self - .thoughts - .lock() - .unwrap() - .iter() - .filter(|t| t.local) - .count() as u64) - } - async fn get_thought_ap_id( - &self, - thought_id: &ThoughtId, - ) -> Result, DomainError> { - Ok(self.thought_ap_ids.lock().unwrap().get(thought_id).cloned()) - } - async fn get_actor_ap_urls( - &self, - user_id: &UserId, - ) -> Result, DomainError> { - Ok(self.actor_ap_urls.lock().unwrap().get(user_id).cloned()) - } -} #[async_trait] impl FederationSchedulerPort for TestStore { @@ -964,31 +803,6 @@ impl OutboxWriter for NoOpOutboxWriter { } } -#[cfg(test)] -mod ap_repo_tests { - use super::*; - use crate::value_objects::UserId; - - #[tokio::test] - async fn test_store_outbox_returns_empty() { - let store = TestStore::default(); - let result = store - .outbox_entries_for_actor(&UserId::new()) - .await - .unwrap(); - assert!(result.is_empty()); - } - - #[tokio::test] - async fn test_store_intern_creates_placeholder() { - let store = TestStore::default(); - let url = "https://example.com/users/alice"; - let id1 = store.intern_remote_actor(url).await.unwrap(); - let id2 = store.intern_remote_actor(url).await.unwrap(); - assert_eq!(id1, id2, "intern must be idempotent"); - } -} - #[cfg(test)] mod federation_port_tests { use super::*; diff --git a/crates/presentation/Cargo.toml b/crates/presentation/Cargo.toml index 6626243..7ff52b9 100644 --- a/crates/presentation/Cargo.toml +++ b/crates/presentation/Cargo.toml @@ -4,8 +4,9 @@ version = "0.1.0" edition = "2021" [dependencies] -domain = { workspace = true } -application = { workspace = true } +domain = { workspace = true } +activitypub-base = { workspace = true } +application = { workspace = true } api-types = { workspace = true } axum = { workspace = true } tower-http = { workspace = true } diff --git a/crates/presentation/src/extractors.rs b/crates/presentation/src/extractors.rs index 5de0dd2..55fc94d 100644 --- a/crates/presentation/src/extractors.rs +++ b/crates/presentation/src/extractors.rs @@ -2,6 +2,27 @@ use crate::{errors::ApiError, state::AppState}; use axum::{extract::FromRequestParts, http::request::Parts}; use domain::value_objects::UserId; +// --------------------------------------------------------------------------- +// Deps extractor — narrows AppState to a handler-specific deps struct +// --------------------------------------------------------------------------- + +pub struct Deps(pub S); + +pub trait FromAppState: Sized { + fn from_state(s: &AppState) -> Self; +} + +impl FromRequestParts for Deps { + type Rejection = std::convert::Infallible; + + async fn from_request_parts( + _parts: &mut Parts, + state: &AppState, + ) -> Result { + Ok(Deps(S::from_state(state))) + } +} + pub struct AuthUser(pub UserId); pub struct OptionalAuthUser(pub Option); diff --git a/crates/presentation/src/handlers/api_keys.rs b/crates/presentation/src/handlers/api_keys.rs index e75728c..61099b8 100644 --- a/crates/presentation/src/handlers/api_keys.rs +++ b/crates/presentation/src/handlers/api_keys.rs @@ -1,23 +1,40 @@ -use crate::{errors::ApiError, extractors::AuthUser, state::AppState}; +use crate::{ + errors::ApiError, + extractors::{AuthUser, Deps, FromAppState}, + state::AppState, +}; use api_types::{ requests::CreateApiKeyRequest, responses::{ApiKeyResponse, CreatedApiKeyResponse}, }; use application::use_cases::api_keys::{create_api_key, delete_api_key, list_api_keys}; use axum::{ - extract::{Path, State}, + extract::Path, http::StatusCode, Json, }; -use domain::value_objects::ApiKeyId; +use domain::{ports::ApiKeyRepository, value_objects::ApiKeyId}; +use std::sync::Arc; use uuid::Uuid; +pub struct ApiKeysDeps { + pub api_keys: Arc, +} + +impl FromAppState for ApiKeysDeps { + fn from_state(s: &AppState) -> Self { + Self { + api_keys: s.api_keys.clone(), + } + } +} + #[utoipa::path(get, path = "/api-keys", responses((status = 200, description = "API keys", body = Vec)), security(("bearer_auth" = [])))] pub async fn get_api_keys( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, ) -> Result>, ApiError> { - let keys = list_api_keys(&*s.api_keys, &uid).await?; + let keys = list_api_keys(&*d.api_keys, &uid).await?; Ok(Json( keys.into_iter() .map(|k| ApiKeyResponse { @@ -30,21 +47,21 @@ pub async fn get_api_keys( } #[utoipa::path(post, path = "/api-keys", request_body = CreateApiKeyRequest, responses((status = 200, description = "Created — raw key shown once", body = CreatedApiKeyResponse)), security(("bearer_auth" = [])))] pub async fn post_api_key( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Json(body): Json, ) -> Result, ApiError> { - let (key, raw) = create_api_key(&*s.api_keys, &uid, body.name).await?; + let (key, raw) = create_api_key(&*d.api_keys, &uid, body.name).await?; Ok(Json( serde_json::json!({ "id": key.id.as_uuid(), "name": key.name, "key": raw }), )) } #[utoipa::path(delete, path = "/api-keys/{id}", params(("id" = uuid::Uuid, Path, description = "Key ID")), responses((status = 204, description = "Deleted")), security(("bearer_auth" = [])))] pub async fn delete_api_key_handler( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Path(id): Path, ) -> Result { - delete_api_key(&*s.api_keys, &uid, &ApiKeyId::from_uuid(id)).await?; + delete_api_key(&*d.api_keys, &uid, &ApiKeyId::from_uuid(id)).await?; Ok(StatusCode::NO_CONTENT) } diff --git a/crates/presentation/src/handlers/auth.rs b/crates/presentation/src/handlers/auth.rs index 400df36..d273c96 100644 --- a/crates/presentation/src/handlers/auth.rs +++ b/crates/presentation/src/handlers/auth.rs @@ -1,10 +1,34 @@ -use crate::{errors::ApiError, state::AppState}; +use crate::{ + errors::ApiError, + extractors::{Deps, FromAppState}, + state::AppState, +}; use api_types::{ requests::{LoginRequest, RegisterRequest}, responses::{AuthResponse, ErrorResponse, UserResponse}, }; use application::use_cases::auth::{login, register, LoginInput, RegisterInput}; -use axum::{extract::State, http::StatusCode, response::IntoResponse, Json}; +use axum::{http::StatusCode, response::IntoResponse, Json}; +use domain::ports::{AuthService, EventPublisher, PasswordHasher, UserRepository}; +use std::sync::Arc; + +pub struct AuthDeps { + pub users: Arc, + pub hasher: Arc, + pub auth: Arc, + pub events: Arc, +} + +impl FromAppState for AuthDeps { + fn from_state(s: &AppState) -> Self { + Self { + users: s.users.clone(), + hasher: s.hasher.clone(), + auth: s.auth.clone(), + events: s.events.clone(), + } + } +} pub fn to_user_response(u: &domain::models::user::User) -> UserResponse { UserResponse { @@ -31,14 +55,14 @@ pub fn to_user_response(u: &domain::models::user::User) -> UserResponse { ) )] pub async fn post_register( - State(s): State, + Deps(d): Deps, Json(body): Json, ) -> Result { let out = register( - &*s.users, - &*s.hasher, - &*s.auth, - &*s.events, + &*d.users, + &*d.hasher, + &*d.auth, + &*d.events, RegisterInput { username: body.username, email: body.email, @@ -62,13 +86,13 @@ pub async fn post_register( ) )] pub async fn post_login( - State(s): State, + Deps(d): Deps, Json(body): Json, ) -> Result { let out = login( - &*s.users, - &*s.hasher, - &*s.auth, + &*d.users, + &*d.hasher, + &*d.auth, LoginInput { email: body.email, password: body.password, diff --git a/crates/presentation/src/handlers/federation_actors.rs b/crates/presentation/src/handlers/federation_actors.rs index 2811e42..8d15177 100644 --- a/crates/presentation/src/handlers/federation_actors.rs +++ b/crates/presentation/src/handlers/federation_actors.rs @@ -1,5 +1,7 @@ use crate::{ - errors::ApiError, extractors::OptionalAuthUser, handlers::feed::to_thought_response, + errors::ApiError, + extractors::{Deps, FromAppState, OptionalAuthUser}, + handlers::feed::to_thought_response, state::AppState, }; use api_types::{ @@ -10,13 +12,41 @@ use application::use_cases::federation_management::{ get_actor_connections_page, get_remote_actor_posts, }; use axum::{ - extract::{Path, Query, State}, + extract::{Path, Query}, Json, }; -use domain::models::feed::PageParams; +use activitypub_base::ActivityPubRepository; +use domain::{ + models::feed::PageParams, + ports::{ + FederationActionPort, FederationSchedulerPort, FeedRepository, + RemoteActorConnectionRepository, + }, +}; +use std::sync::Arc; + +pub struct FederationActorsDeps { + pub federation: Arc, + pub ap_repo: Arc, + pub feed: Arc, + pub federation_scheduler: Arc, + pub remote_actor_connections: Arc, +} + +impl FromAppState for FederationActorsDeps { + fn from_state(s: &AppState) -> Self { + Self { + federation: s.federation.clone(), + ap_repo: s.ap_repo.clone(), + feed: s.feed.clone(), + federation_scheduler: s.federation_scheduler.clone(), + remote_actor_connections: s.remote_actor_connections.clone(), + } + } +} pub async fn remote_actor_posts_handler( - State(s): State, + Deps(d): Deps, Path(handle): Path, Query(q): Query, OptionalAuthUser(viewer): OptionalAuthUser, @@ -26,10 +56,10 @@ pub async fn remote_actor_posts_handler( per_page: q.per_page(), }; let result = get_remote_actor_posts( - &*s.federation, - &*s.ap_repo, - &*s.feed, - &*s.federation_scheduler, + &*d.federation, + &*d.ap_repo, + &*d.feed, + &*d.federation_scheduler, &handle, page, viewer.as_ref(), @@ -44,31 +74,31 @@ pub async fn remote_actor_posts_handler( } pub async fn actor_followers_handler( - State(s): State, + Deps(d): Deps, Path(handle): Path, Query(q): Query, ) -> Result, ApiError> { - actor_connections_handler(s, handle, "followers", q.page() as u32).await + actor_connections_handler(d, handle, "followers", q.page() as u32).await } pub async fn actor_following_handler( - State(s): State, + Deps(d): Deps, Path(handle): Path, Query(q): Query, ) -> Result, ApiError> { - actor_connections_handler(s, handle, "following", q.page() as u32).await + actor_connections_handler(d, handle, "following", q.page() as u32).await } async fn actor_connections_handler( - s: AppState, + d: FederationActorsDeps, handle: String, connection_type: &str, page: u32, ) -> Result, ApiError> { let (items, has_more) = get_actor_connections_page( - &*s.federation, - &*s.remote_actor_connections, - &*s.federation_scheduler, + &*d.federation, + &*d.remote_actor_connections, + &*d.federation_scheduler, &handle, connection_type, page, diff --git a/crates/presentation/src/handlers/federation_management.rs b/crates/presentation/src/handlers/federation_management.rs index cd8aa4a..705f2b6 100644 --- a/crates/presentation/src/handlers/federation_management.rs +++ b/crates/presentation/src/handlers/federation_management.rs @@ -1,11 +1,17 @@ -use crate::{errors::ApiError, extractors::AuthUser, state::AppState}; +use crate::{ + errors::ApiError, + extractors::{AuthUser, Deps, FromAppState}, + state::AppState, +}; use api_types::responses::{ProfileField, RemoteActorResponse}; use application::use_cases::federation_management::{ accept_follow_request, list_pending_requests, list_remote_followers, list_remote_following, reject_follow_request, remove_remote_following, }; -use axum::{extract::State, http::StatusCode, Json}; +use axum::{http::StatusCode, Json}; +use domain::ports::{EventPublisher, FederationActionPort, FollowRepository, UserRepository}; use serde::Deserialize; +use std::sync::Arc; #[derive(Deserialize)] pub struct ActorUrlBody { @@ -17,6 +23,24 @@ pub struct HandleBody { pub handle: String, } +pub struct FederationManagementDeps { + pub federation: Arc, + pub follows: Arc, + pub users: Arc, + pub events: Arc, +} + +impl FromAppState for FederationManagementDeps { + fn from_state(s: &AppState) -> Self { + Self { + federation: s.federation.clone(), + follows: s.follows.clone(), + users: s.users.clone(), + events: s.events.clone(), + } + } +} + fn to_response(a: domain::models::remote_actor::RemoteActor) -> RemoteActorResponse { RemoteActorResponse { handle: a.handle, @@ -38,57 +62,57 @@ fn to_response(a: domain::models::remote_actor::RemoteActor) -> RemoteActorRespo } pub async fn get_pending_requests( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, ) -> Result>, ApiError> { - let actors = list_pending_requests(&*s.federation, &uid).await?; + let actors = list_pending_requests(&*d.federation, &uid).await?; Ok(Json(actors.into_iter().map(to_response).collect())) } pub async fn post_accept_request( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Json(body): Json, ) -> Result { - accept_follow_request(&*s.federation, &uid, &body.actor_url).await?; + accept_follow_request(&*d.federation, &uid, &body.actor_url).await?; Ok(StatusCode::NO_CONTENT) } pub async fn delete_follower( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Json(body): Json, ) -> Result { - reject_follow_request(&*s.federation, &uid, &body.actor_url).await?; + reject_follow_request(&*d.federation, &uid, &body.actor_url).await?; Ok(StatusCode::NO_CONTENT) } pub async fn get_remote_followers( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, ) -> Result>, ApiError> { - let actors = list_remote_followers(&*s.federation, &uid).await?; + let actors = list_remote_followers(&*d.federation, &uid).await?; Ok(Json(actors.into_iter().map(to_response).collect())) } pub async fn get_remote_following( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, ) -> Result>, ApiError> { - let actors = list_remote_following(&*s.federation, &uid).await?; + let actors = list_remote_following(&*d.federation, &uid).await?; Ok(Json(actors.into_iter().map(to_response).collect())) } pub async fn delete_following( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Json(body): Json, ) -> Result { remove_remote_following( - &*s.follows, - &*s.users, - &*s.federation, - &*s.events, + &*d.follows, + &*d.users, + &*d.federation, + &*d.events, &uid, &body.handle, ) diff --git a/crates/presentation/src/handlers/feed.rs b/crates/presentation/src/handlers/feed.rs index 3a03c9f..c7f7407 100644 --- a/crates/presentation/src/handlers/feed.rs +++ b/crates/presentation/src/handlers/feed.rs @@ -1,6 +1,6 @@ use crate::{ errors::ApiError, - extractors::{AuthUser, OptionalAuthUser}, + extractors::{Deps, FromAppState, OptionalAuthUser, AuthUser}, handlers::auth::to_user_response, state::AppState, }; @@ -12,12 +12,38 @@ use application::use_cases::feed::{ }; use application::use_cases::profile::{get_user_by_id_or_username, get_user_by_username}; use axum::{ - extract::{Path, Query, State}, + extract::{Path, Query}, http::{header, HeaderMap}, response::{IntoResponse, Response}, Json, }; -use domain::models::feed::PageParams; +use domain::{ + models::feed::PageParams, + ports::{FederationActionPort, FeedRepository, FollowRepository, SearchPort, TagRepository, UserRepository}, +}; +use std::sync::Arc; + +pub struct FeedDeps { + pub feed: Arc, + pub follows: Arc, + pub search: Arc, + pub federation: Arc, + pub users: Arc, + pub tags: Arc, +} + +impl FromAppState for FeedDeps { + fn from_state(s: &AppState) -> Self { + Self { + feed: s.feed.clone(), + follows: s.follows.clone(), + search: s.search.clone(), + federation: s.federation.clone(), + users: s.users.clone(), + tags: s.tags.clone(), + } + } +} pub fn to_thought_response(e: &domain::models::feed::FeedEntry) -> ThoughtResponse { ThoughtResponse { @@ -46,7 +72,7 @@ pub fn to_thought_response(e: &domain::models::feed::FeedEntry) -> ThoughtRespon security(("bearer_auth" = [])) )] pub async fn home_feed( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Query(q): Query, ) -> Result, ApiError> { @@ -54,7 +80,7 @@ pub async fn home_feed( page: q.page(), per_page: q.per_page(), }; - let result = get_home_feed(&*s.feed, &*s.follows, &uid, page).await?; + let result = get_home_feed(&*d.feed, &*d.follows, &uid, page).await?; Ok(Json(serde_json::json!({ "items": result.items.iter().map(to_thought_response).collect::>(), "total": result.total, @@ -69,7 +95,7 @@ pub async fn home_feed( responses((status = 200, description = "Public feed")) )] pub async fn public_feed( - State(s): State, + Deps(d): Deps, OptionalAuthUser(viewer): OptionalAuthUser, Query(q): Query, ) -> Result, ApiError> { @@ -77,7 +103,7 @@ pub async fn public_feed( page: q.page(), per_page: q.per_page(), }; - let result = get_public_feed(&*s.feed, viewer.as_ref(), page).await?; + let result = get_public_feed(&*d.feed, viewer.as_ref(), page).await?; Ok(Json(serde_json::json!({ "items": result.items.iter().map(to_thought_response).collect::>(), "total": result.total, @@ -92,7 +118,7 @@ pub async fn public_feed( responses((status = 200, description = "Search results: thoughts and users")) )] pub async fn search_handler( - State(s): State, + Deps(d): Deps, OptionalAuthUser(viewer): OptionalAuthUser, Query(q): Query, ) -> Result, ApiError> { @@ -103,8 +129,8 @@ pub async fn search_handler( let query = q.q.trim().to_string(); let (thoughts_result, users_result) = tokio::join!( - s.search.search_thoughts(&query, &page, viewer.as_ref()), - s.search.search_users(&query, &page), + d.search.search_thoughts(&query, &page, viewer.as_ref()), + d.search.search_users(&query, &page), ); let thoughts = thoughts_result? @@ -127,7 +153,7 @@ pub async fn search_handler( } pub async fn get_following_handler( - State(s): State, + Deps(d): Deps, Path(param): Path, Query(q): Query, headers: HeaderMap, @@ -138,22 +164,22 @@ pub async fn get_following_handler( .unwrap_or(""); if accept.contains("application/activity+json") { - let user = get_user_by_id_or_username(&*s.users, ¶m).await?; + let user = get_user_by_id_or_username(&*d.users, ¶m).await?; let user_id = user.id; let page = q.page().try_into().ok(); - let json = s + let json = d .federation .following_collection_json(&user_id, page) .await?; return Ok(([(header::CONTENT_TYPE, "application/activity+json")], json).into_response()); } - let user = get_user_by_username(&*s.users, ¶m).await?; + let user = get_user_by_username(&*d.users, ¶m).await?; let page = PageParams { page: q.page(), per_page: q.per_page(), }; - let result = get_following(&*s.follows, &user.id, page).await?; + let result = get_following(&*d.follows, &user.id, page).await?; Ok(Json(serde_json::json!({ "total": result.total, "items": result.items.iter().map(to_user_response).collect::>() @@ -162,7 +188,7 @@ pub async fn get_following_handler( } pub async fn get_followers_handler( - State(s): State, + Deps(d): Deps, Path(param): Path, Query(q): Query, headers: HeaderMap, @@ -173,22 +199,22 @@ pub async fn get_followers_handler( .unwrap_or(""); if accept.contains("application/activity+json") { - let user = get_user_by_id_or_username(&*s.users, ¶m).await?; + let user = get_user_by_id_or_username(&*d.users, ¶m).await?; let user_id = user.id; let page = q.page().try_into().ok(); - let json = s + let json = d .federation .followers_collection_json(&user_id, page) .await?; return Ok(([(header::CONTENT_TYPE, "application/activity+json")], json).into_response()); } - let user = get_user_by_username(&*s.users, ¶m).await?; + let user = get_user_by_username(&*d.users, ¶m).await?; let page = PageParams { page: q.page(), per_page: q.per_page(), }; - let result = get_followers(&*s.follows, &user.id, page).await?; + let result = get_followers(&*d.follows, &user.id, page).await?; Ok(Json(serde_json::json!({ "total": result.total, "items": result.items.iter().map(to_user_response).collect::>() @@ -205,17 +231,17 @@ pub async fn get_followers_handler( responses((status = 200, description = "User's public thoughts")) )] pub async fn user_thoughts_handler( - State(s): State, + Deps(d): Deps, Path(username): Path, OptionalAuthUser(viewer): OptionalAuthUser, Query(q): Query, ) -> Result, ApiError> { - let user = get_user_by_username(&*s.users, &username).await?; + let user = get_user_by_username(&*d.users, &username).await?; let page = PageParams { page: q.page(), per_page: q.per_page(), }; - let result = get_user_feed(&*s.feed, &user.id, page, viewer.as_ref()).await?; + let result = get_user_feed(&*d.feed, &user.id, page, viewer.as_ref()).await?; Ok(Json(serde_json::json!({ "total": result.total, "page": result.page, @@ -225,7 +251,7 @@ pub async fn user_thoughts_handler( } pub async fn get_popular_tags( - State(s): State, + Deps(d): Deps, Query(params): Query>, ) -> Result, ApiError> { let limit: usize = params @@ -233,7 +259,7 @@ pub async fn get_popular_tags( .and_then(|v| v.parse().ok()) .unwrap_or(api_types::requests::DEFAULT_PER_PAGE as usize); let tags = uc_get_popular_tags( - &*s.tags, + &*d.tags, limit.min(api_types::requests::MAX_PER_PAGE as usize), ) .await?; @@ -254,7 +280,7 @@ pub async fn get_popular_tags( responses((status = 200, description = "Thoughts with this tag")) )] pub async fn tag_thoughts_handler( - State(s): State, + Deps(d): Deps, Path(tag_name): Path, OptionalAuthUser(viewer): OptionalAuthUser, Query(q): Query, @@ -263,7 +289,7 @@ pub async fn tag_thoughts_handler( page: q.page(), per_page: q.per_page(), }; - let result = get_by_tag(&*s.feed, &tag_name, page, viewer.as_ref()).await?; + let result = get_by_tag(&*d.feed, &tag_name, page, viewer.as_ref()).await?; Ok(Json(serde_json::json!({ "tag": tag_name, "total": result.total, diff --git a/crates/presentation/src/handlers/health.rs b/crates/presentation/src/handlers/health.rs index de19c1a..7f2904d 100644 --- a/crates/presentation/src/handlers/health.rs +++ b/crates/presentation/src/handlers/health.rs @@ -1,9 +1,26 @@ -use crate::state::AppState; -use axum::{extract::State, Json}; +use crate::{ + extractors::{Deps, FromAppState}, + state::AppState, +}; +use axum::Json; +use domain::ports::UserRepository; +use std::sync::Arc; + +pub struct HealthDeps { + pub users: Arc, +} + +impl FromAppState for HealthDeps { + fn from_state(s: &AppState) -> Self { + Self { + users: s.users.clone(), + } + } +} #[utoipa::path(get, path = "/health", responses((status = 200, description = "Service health status")))] -pub async fn health_handler(State(s): State) -> Json { - let db_ok = s.users.list_with_stats().await.is_ok(); +pub async fn health_handler(Deps(d): Deps) -> Json { + let db_ok = d.users.list_with_stats().await.is_ok(); Json(serde_json::json!({ "status": if db_ok { "ok" } else { "degraded" }, "db": if db_ok { "connected" } else { "error" }, diff --git a/crates/presentation/src/handlers/notifications.rs b/crates/presentation/src/handlers/notifications.rs index 69b5e1a..f24b1ee 100644 --- a/crates/presentation/src/handlers/notifications.rs +++ b/crates/presentation/src/handlers/notifications.rs @@ -1,28 +1,47 @@ -use crate::{errors::ApiError, extractors::AuthUser, state::AppState}; +use crate::{ + errors::ApiError, + extractors::{AuthUser, Deps, FromAppState}, + state::AppState, +}; use api_types::requests::NotificationUpdateRequest; use application::use_cases::notifications::{ count_unread_notifications, list_notifications as uc_list_notifications, mark_all_notifications_read, mark_notification_read as uc_mark_notification_read, }; use axum::{ - extract::{Path, State}, + extract::Path, http::StatusCode, Json, }; -use domain::{models::feed::PageParams, value_objects::NotificationId}; +use domain::{ + models::feed::PageParams, ports::NotificationRepository, value_objects::NotificationId, +}; +use std::sync::Arc; use uuid::Uuid; +pub struct NotificationsDeps { + pub notifications: Arc, +} + +impl FromAppState for NotificationsDeps { + fn from_state(s: &AppState) -> Self { + Self { + notifications: s.notifications.clone(), + } + } +} + #[utoipa::path(get, path = "/notifications", responses((status = 200, description = "Notification summary")), security(("bearer_auth" = [])))] pub async fn list_notifications( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, ) -> Result, ApiError> { let page = PageParams { page: 1, per_page: 20, }; - let result = uc_list_notifications(&*s.notifications, &uid, page).await?; - let unread = count_unread_notifications(&*s.notifications, &uid).await?; + let result = uc_list_notifications(&*d.notifications, &uid, page).await?; + let unread = count_unread_notifications(&*d.notifications, &uid).await?; Ok(Json(serde_json::json!({ "total": result.total, "unread": unread @@ -31,13 +50,13 @@ pub async fn list_notifications( #[utoipa::path(patch, path = "/notifications/{id}", params(("id" = uuid::Uuid, Path, description = "Notification ID")), request_body = NotificationUpdateRequest, responses((status = 204, description = "Marked read")), security(("bearer_auth" = [])))] pub async fn mark_notification_read( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Path(id): Path, Json(body): Json, ) -> Result { uc_mark_notification_read( - &*s.notifications, + &*d.notifications, &NotificationId::from_uuid(id), &uid, body.read, @@ -48,11 +67,11 @@ pub async fn mark_notification_read( #[utoipa::path(patch, path = "/notifications", request_body = NotificationUpdateRequest, responses((status = 204, description = "All marked read")), security(("bearer_auth" = [])))] pub async fn mark_all_read( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Json(body): Json, ) -> Result { - mark_all_notifications_read(&*s.notifications, &uid, body.read).await?; + mark_all_notifications_read(&*d.notifications, &uid, body.read).await?; Ok(StatusCode::NO_CONTENT) } diff --git a/crates/presentation/src/handlers/social.rs b/crates/presentation/src/handlers/social.rs index 632448e..b3bc334 100644 --- a/crates/presentation/src/handlers/social.rs +++ b/crates/presentation/src/handlers/social.rs @@ -1,49 +1,86 @@ -use crate::{errors::ApiError, extractors::AuthUser, state::AppState}; +use crate::{ + errors::ApiError, + extractors::{AuthUser, Deps, FromAppState}, + state::AppState, +}; use api_types::requests::SetTopFriendsRequest; use application::use_cases::profile::{get_top_friends, get_user_by_username, set_top_friends}; use application::use_cases::social::*; use axum::{ - extract::{Path, State}, + extract::Path, http::StatusCode, Json, }; -use domain::value_objects::{ThoughtId, UserId}; +use domain::{ + ports::{ + BlockRepository, BoostRepository, EventPublisher, FederationActionPort, FollowRepository, + LikeRepository, TopFriendRepository, UserRepository, + }, + value_objects::{ThoughtId, UserId}, +}; +use std::sync::Arc; use uuid::Uuid; +pub struct SocialDeps { + pub likes: Arc, + pub boosts: Arc, + pub follows: Arc, + pub users: Arc, + pub federation: Arc, + pub events: Arc, + pub blocks: Arc, + pub top_friends: Arc, +} + +impl FromAppState for SocialDeps { + fn from_state(s: &AppState) -> Self { + Self { + likes: s.likes.clone(), + boosts: s.boosts.clone(), + follows: s.follows.clone(), + users: s.users.clone(), + federation: s.federation.clone(), + events: s.events.clone(), + blocks: s.blocks.clone(), + top_friends: s.top_friends.clone(), + } + } +} + #[utoipa::path(post, path = "/thoughts/{id}/like", params(("id" = uuid::Uuid, Path, description = "Thought ID")), responses((status = 204, description = "Liked")), security(("bearer_auth" = [])))] pub async fn post_like( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Path(id): Path, ) -> Result { - like_thought(&*s.likes, &*s.events, &uid, &ThoughtId::from_uuid(id)).await?; + like_thought(&*d.likes, &*d.events, &uid, &ThoughtId::from_uuid(id)).await?; Ok(StatusCode::NO_CONTENT) } #[utoipa::path(delete, path = "/thoughts/{id}/like", params(("id" = uuid::Uuid, Path, description = "Thought ID")), responses((status = 204, description = "Unliked")), security(("bearer_auth" = [])))] pub async fn delete_like( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Path(id): Path, ) -> Result { - unlike_thought(&*s.likes, &*s.events, &uid, &ThoughtId::from_uuid(id)).await?; + unlike_thought(&*d.likes, &*d.events, &uid, &ThoughtId::from_uuid(id)).await?; Ok(StatusCode::NO_CONTENT) } #[utoipa::path(post, path = "/thoughts/{id}/boost", params(("id" = uuid::Uuid, Path, description = "Thought ID")), responses((status = 204, description = "Boosted")), security(("bearer_auth" = [])))] pub async fn post_boost( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Path(id): Path, ) -> Result { - boost_thought(&*s.boosts, &*s.events, &uid, &ThoughtId::from_uuid(id)).await?; + boost_thought(&*d.boosts, &*d.events, &uid, &ThoughtId::from_uuid(id)).await?; Ok(StatusCode::NO_CONTENT) } #[utoipa::path(delete, path = "/thoughts/{id}/boost", params(("id" = uuid::Uuid, Path, description = "Thought ID")), responses((status = 204, description = "Unboosted")), security(("bearer_auth" = [])))] pub async fn delete_boost( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Path(id): Path, ) -> Result { - unboost_thought(&*s.boosts, &*s.events, &uid, &ThoughtId::from_uuid(id)).await?; + unboost_thought(&*d.boosts, &*d.events, &uid, &ThoughtId::from_uuid(id)).await?; Ok(StatusCode::NO_CONTENT) } #[utoipa::path( @@ -53,15 +90,15 @@ pub async fn delete_boost( security(("bearer_auth" = [])) )] pub async fn post_follow( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Path(username): Path, ) -> Result { follow_actor( - &*s.follows, - &*s.users, - &*s.federation, - &*s.events, + &*d.follows, + &*d.users, + &*d.federation, + &*d.events, &uid, &username, ) @@ -75,15 +112,15 @@ pub async fn post_follow( security(("bearer_auth" = [])) )] pub async fn delete_follow( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Path(username): Path, ) -> Result { unfollow_actor( - &*s.follows, - &*s.users, - &*s.federation, - &*s.events, + &*d.follows, + &*d.users, + &*d.federation, + &*d.events, &uid, &username, ) @@ -92,39 +129,39 @@ pub async fn delete_follow( } #[utoipa::path(post, path = "/users/{username}/block", params(("username" = String, Path, description = "Username")), responses((status = 204, description = "Blocked")), security(("bearer_auth" = [])))] pub async fn post_block( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Path(username): Path, ) -> Result { - block_by_username(&*s.blocks, &*s.users, &*s.events, &uid, &username).await?; + block_by_username(&*d.blocks, &*d.users, &*d.events, &uid, &username).await?; Ok(StatusCode::NO_CONTENT) } #[utoipa::path(delete, path = "/users/{username}/block", params(("username" = String, Path, description = "Username")), responses((status = 204, description = "Unblocked")), security(("bearer_auth" = [])))] pub async fn delete_block( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Path(username): Path, ) -> Result { - unblock_by_username(&*s.blocks, &*s.users, &*s.events, &uid, &username).await?; + unblock_by_username(&*d.blocks, &*d.users, &*d.events, &uid, &username).await?; Ok(StatusCode::NO_CONTENT) } #[utoipa::path(put, path = "/users/me/top-friends", request_body = SetTopFriendsRequest, responses((status = 204, description = "Top friends updated")), security(("bearer_auth" = [])))] pub async fn put_top_friends( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Json(body): Json, ) -> Result { let ids: Vec = body.friend_ids.into_iter().map(UserId::from_uuid).collect(); - set_top_friends(&*s.top_friends, &uid, ids).await?; + set_top_friends(&*d.top_friends, &uid, ids).await?; Ok(StatusCode::NO_CONTENT) } #[utoipa::path(get, path = "/users/{username}/top-friends", params(("username" = String, Path, description = "Username")), responses((status = 200, description = "Top friends list")))] pub async fn get_top_friends_handler( - State(s): State, + Deps(d): Deps, Path(username): Path, ) -> Result, ApiError> { - let user = get_user_by_username(&*s.users, &username).await?; - let friends = get_top_friends(&*s.top_friends, &user.id).await?; + let user = get_user_by_username(&*d.users, &username).await?; + let friends = get_top_friends(&*d.top_friends, &user.id).await?; let usernames: Vec<&str> = friends.iter().map(|(_, u)| u.username.as_str()).collect(); Ok(Json(serde_json::json!({ "topFriends": usernames }))) } diff --git a/crates/presentation/src/handlers/thoughts.rs b/crates/presentation/src/handlers/thoughts.rs index 87c5b9c..1ff33ba 100644 --- a/crates/presentation/src/handlers/thoughts.rs +++ b/crates/presentation/src/handlers/thoughts.rs @@ -1,6 +1,6 @@ use crate::{ errors::ApiError, - extractors::{AuthUser, OptionalAuthUser}, + extractors::{AuthUser, Deps, FromAppState, OptionalAuthUser}, handlers::auth::to_user_response, state::AppState, }; @@ -12,14 +12,38 @@ use application::use_cases::thoughts::{ create_thought, delete_thought, edit_thought, get_thought, get_thread, CreateThoughtInput, }; use axum::{ - extract::{Path, State}, + extract::Path, http::StatusCode, response::IntoResponse, Json, }; -use domain::value_objects::ThoughtId; +use domain::{ + ports::{EventPublisher, OutboxWriter, TagRepository, ThoughtRepository, UserRepository}, + value_objects::ThoughtId, +}; +use std::sync::Arc; use uuid::Uuid; +pub struct ThoughtsDeps { + pub thoughts: Arc, + pub users: Arc, + pub tags: Arc, + pub events: Arc, + pub outbox: Arc, +} + +impl FromAppState for ThoughtsDeps { + fn from_state(s: &AppState) -> Self { + Self { + thoughts: s.thoughts.clone(), + users: s.users.clone(), + tags: s.tags.clone(), + events: s.events.clone(), + outbox: s.outbox.clone(), + } + } +} + fn thought_to_json( t: &domain::models::thought::Thought, author: &domain::models::user::User, @@ -56,17 +80,17 @@ fn thought_to_json( security(("bearer_auth" = [])) )] pub async fn post_thought( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Json(body): Json, ) -> Result { let in_reply_to = body.in_reply_to_id.map(ThoughtId::from_uuid); let out = create_thought( - &*s.thoughts, - &*s.users, - &*s.tags, - &*s.events, - &*s.outbox, + &*d.thoughts, + &*d.users, + &*d.tags, + &*d.events, + &*d.outbox, CreateThoughtInput { user_id: uid.clone(), content: body.content, @@ -77,7 +101,7 @@ pub async fn post_thought( }, ) .await?; - let author = s + let author = d .users .find_by_id(&uid) .await? @@ -97,12 +121,12 @@ pub async fn post_thought( ) )] pub async fn get_thought_handler( - State(s): State, + Deps(d): Deps, Path(id): Path, OptionalAuthUser(_viewer): OptionalAuthUser, ) -> Result, ApiError> { - let thought = get_thought(&*s.thoughts, &ThoughtId::from_uuid(id)).await?; - let author = s + let thought = get_thought(&*d.thoughts, &ThoughtId::from_uuid(id)).await?; + let author = d .users .find_by_id(&thought.user_id) .await? @@ -121,11 +145,11 @@ pub async fn get_thought_handler( security(("bearer_auth" = [])) )] pub async fn delete_thought_handler( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Path(id): Path, ) -> Result { - delete_thought(&*s.thoughts, &*s.events, &*s.outbox, &ThoughtId::from_uuid(id), &uid).await?; + delete_thought(&*d.thoughts, &*d.events, &*d.outbox, &ThoughtId::from_uuid(id), &uid).await?; Ok(StatusCode::NO_CONTENT) } @@ -141,14 +165,14 @@ pub async fn delete_thought_handler( security(("bearer_auth" = [])) )] pub async fn patch_thought( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Path(id): Path, Json(body): Json, ) -> Result { edit_thought( - &*s.thoughts, - &*s.events, + &*d.thoughts, + &*d.events, &ThoughtId::from_uuid(id), &uid, body.content, @@ -165,13 +189,13 @@ pub async fn patch_thought( ) )] pub async fn get_thread_handler( - State(s): State, + Deps(d): Deps, Path(id): Path, ) -> Result>, ApiError> { - let thoughts = get_thread(&*s.thoughts, &ThoughtId::from_uuid(id)).await?; + let thoughts = get_thread(&*d.thoughts, &ThoughtId::from_uuid(id)).await?; let mut items = Vec::new(); for t in &thoughts { - if let Ok(Some(author)) = s.users.find_by_id(&t.user_id).await { + if let Ok(Some(author)) = d.users.find_by_id(&t.user_id).await { items.push(thought_to_json(t, &author, 0, 0, 0)); } } diff --git a/crates/presentation/src/handlers/users.rs b/crates/presentation/src/handlers/users.rs index e345c97..eb0b539 100644 --- a/crates/presentation/src/handlers/users.rs +++ b/crates/presentation/src/handlers/users.rs @@ -1,6 +1,6 @@ use crate::{ errors::ApiError, - extractors::{AuthUser, OptionalAuthUser}, + extractors::{AuthUser, Deps, FromAppState, OptionalAuthUser}, handlers::auth::to_user_response, state::AppState, }; @@ -13,11 +13,35 @@ use application::use_cases::profile::{ get_user as fetch_user, get_user_by_id_or_username, update_profile, }; use axum::{ - extract::{Path, Query, State}, + extract::{Path, Query}, http::{header, HeaderMap}, response::{IntoResponse, Response}, Json, }; +use domain::ports::{ + EventPublisher, FederationActionPort, FollowRepository, SearchPort, UserRepository, +}; +use std::sync::Arc; + +pub struct UsersDeps { + pub users: Arc, + pub events: Arc, + pub follows: Arc, + pub federation: Arc, + pub search: Arc, +} + +impl FromAppState for UsersDeps { + fn from_state(s: &AppState) -> Self { + Self { + users: s.users.clone(), + events: s.events.clone(), + follows: s.follows.clone(), + federation: s.federation.clone(), + search: s.search.clone(), + } + } +} #[utoipa::path( get, path = "/users/{username}", @@ -28,12 +52,12 @@ use axum::{ ) )] pub async fn get_user( - State(s): State, + Deps(d): Deps, Path(username): Path, OptionalAuthUser(viewer): OptionalAuthUser, headers: HeaderMap, ) -> Result { - let user = get_user_by_id_or_username(&*s.users, &username).await?; + let user = get_user_by_id_or_username(&*d.users, &username).await?; let accept = headers .get(header::ACCEPT) @@ -41,11 +65,11 @@ pub async fn get_user( .unwrap_or(""); if accept.contains("application/activity+json") { - let json = s.federation.actor_json(&user.id).await?; + let json = d.federation.actor_json(&user.id).await?; Ok(([(header::CONTENT_TYPE, "application/activity+json")], json).into_response()) } else { let is_followed = if let Some(viewer_id) = viewer { - s.follows.find(&viewer_id, &user.id).await?.is_some() + d.follows.find(&viewer_id, &user.id).await?.is_some() } else { false }; @@ -65,13 +89,13 @@ pub async fn get_user( security(("bearer_auth" = [])) )] pub async fn patch_profile( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Json(body): Json, ) -> Result, ApiError> { update_profile( - &*s.users, - &*s.events, + &*d.users, + &*d.events, &uid, body.display_name, body.bio, @@ -80,7 +104,7 @@ pub async fn patch_profile( body.custom_css, ) .await?; - let user = fetch_user(&*s.users, &uid).await?; + let user = fetch_user(&*d.users, &uid).await?; Ok(Json(to_user_response(&user))) } @@ -93,15 +117,15 @@ pub async fn patch_profile( security(("bearer_auth" = [])) )] pub async fn get_me( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, ) -> Result, ApiError> { - let user = fetch_user(&*s.users, &uid).await?; + let user = fetch_user(&*d.users, &uid).await?; Ok(Json(to_user_response(&user))) } pub async fn get_me_following( - State(s): State, + Deps(d): Deps, AuthUser(uid): AuthUser, Query(q): Query, ) -> Result, ApiError> { @@ -111,7 +135,7 @@ pub async fn get_me_following( page: q.page(), per_page: q.per_page(), }; - let result = get_following(&*s.follows, &uid, page).await?; + let result = get_following(&*d.follows, &uid, page).await?; Ok(Json(serde_json::json!({ "total": result.total, "items": result.items.iter().map(to_user_response).collect::>(), @@ -119,7 +143,7 @@ pub async fn get_me_following( } pub async fn get_users( - State(s): State, + Deps(d): Deps, Query(params): Query>, ) -> Result, ApiError> { use domain::models::feed::PageParams; @@ -134,7 +158,7 @@ pub async fn get_users( let page_params = PageParams { page, per_page }; if let Some(q) = params.get("q").filter(|q| !q.trim().is_empty()) { - let result = s.search.search_users(q, &page_params).await?; + let result = d.search.search_users(q, &page_params).await?; let users: Vec<_> = result .items .iter() @@ -145,7 +169,7 @@ pub async fn get_users( }))); } - let result = list_users_paginated(&*s.users, page_params).await?; + let result = list_users_paginated(&*d.users, page_params).await?; let items: Vec<_> = result .items .iter() @@ -170,9 +194,9 @@ pub async fn get_users( } pub async fn get_user_count( - State(s): State, + Deps(d): Deps, ) -> Result, ApiError> { - let count = s.users.count().await?; + let count = d.users.count().await?; Ok(Json(serde_json::json!({ "count": count }))) } @@ -182,10 +206,10 @@ pub struct LookupQuery { } pub async fn lookup_handler( - State(s): State, + Deps(d): Deps, Query(q): Query, ) -> Result, ApiError> { - let actor = s.federation.lookup_actor(&q.handle).await?; + let actor = d.federation.lookup_actor(&q.handle).await?; Ok(Json(RemoteActorResponse { handle: actor.handle, display_name: actor.display_name, diff --git a/crates/presentation/src/lib.rs b/crates/presentation/src/lib.rs index 1d38562..767a350 100644 --- a/crates/presentation/src/lib.rs +++ b/crates/presentation/src/lib.rs @@ -6,3 +6,5 @@ pub mod routes; pub mod state; #[cfg(test)] pub mod testing; + +pub use extractors::{Deps, FromAppState}; diff --git a/crates/presentation/src/state.rs b/crates/presentation/src/state.rs index 7f77a91..2ee7bfa 100644 --- a/crates/presentation/src/state.rs +++ b/crates/presentation/src/state.rs @@ -1,3 +1,4 @@ +use activitypub_base::ActivityPubRepository; use domain::ports::*; use std::sync::Arc; diff --git a/crates/presentation/src/testing.rs b/crates/presentation/src/testing.rs index d7401e5..1e801f9 100644 --- a/crates/presentation/src/testing.rs +++ b/crates/presentation/src/testing.rs @@ -1,10 +1,11 @@ use crate::state::AppState; +use activitypub_base::{ActivityPubRepository, ActorApUrls, OutboxEntry}; use async_trait::async_trait; use domain::{ errors::DomainError, ports::{AuthService, GeneratedToken, PasswordHasher}, testing::{NoOpOutboxWriter, TestStore}, - value_objects::{PasswordHash, UserId}, + value_objects::{PasswordHash, ThoughtId, UserId}, }; use std::sync::Arc; @@ -29,6 +30,85 @@ impl PasswordHasher for NoOpHasher { } } +/// No-op ActivityPubRepository for presentation layer tests. +pub struct NoOpApRepo; + +#[async_trait] +impl ActivityPubRepository for NoOpApRepo { + async fn outbox_entries_for_actor( + &self, + _uid: &UserId, + ) -> Result, DomainError> { + Ok(vec![]) + } + async fn outbox_page_for_actor( + &self, + _uid: &UserId, + _before: Option>, + _limit: usize, + ) -> Result, DomainError> { + Ok(vec![]) + } + async fn find_remote_actor_id( + &self, + _actor_ap_url: &str, + ) -> Result, DomainError> { + Ok(None) + } + async fn intern_remote_actor(&self, _actor_ap_url: &str) -> Result { + Err(DomainError::NotFound) + } + async fn update_remote_actor_display( + &self, + _user_id: &UserId, + _display_name: Option<&str>, + _avatar_url: Option<&str>, + ) -> Result<(), DomainError> { + Ok(()) + } + async fn accept_note( + &self, + _ap_id: &str, + _author_id: &UserId, + _content: &str, + _published: chrono::DateTime, + _sensitive: bool, + _content_warning: Option, + _visibility: &str, + _in_reply_to: Option<&str>, + ) -> Result<(), DomainError> { + Ok(()) + } + async fn apply_note_update( + &self, + _ap_id: &str, + _new_content: &str, + ) -> Result<(), DomainError> { + Ok(()) + } + async fn retract_note(&self, _ap_id: &str) -> Result<(), DomainError> { + Ok(()) + } + async fn retract_actor_notes(&self, _actor_ap_url: &str) -> Result<(), DomainError> { + Ok(()) + } + async fn count_local_notes(&self) -> Result { + Ok(0) + } + async fn get_thought_ap_id( + &self, + _thought_id: &ThoughtId, + ) -> Result, DomainError> { + Ok(None) + } + async fn get_actor_ap_urls( + &self, + _user_id: &UserId, + ) -> Result, DomainError> { + Ok(None) + } +} + pub fn make_state() -> AppState { let store = Arc::new(TestStore::default()); AppState { @@ -50,7 +130,7 @@ pub fn make_state() -> AppState { events: store.clone(), outbox: Arc::new(NoOpOutboxWriter), federation: store.clone(), - ap_repo: store.clone(), + ap_repo: Arc::new(NoOpApRepo), remote_actor_connections: store.clone(), federation_scheduler: store.clone(), } diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs index f209a78..95b279c 100644 --- a/crates/worker/src/factory.rs +++ b/crates/worker/src/factory.rs @@ -5,7 +5,8 @@ use std::sync::Arc; use activitypub::ThoughtsObjectHandler; use activitypub_base::ActivityPubService; use application::services::{FederationEventService, NotificationEventService}; -use domain::ports::{ActivityPubRepository, EventPublisher, OutboundFederationPort}; +use activitypub_base::{ActivityPubRepository, OutboundFederationPort}; +use domain::ports::EventPublisher; use postgres::activitypub::PgActivityPubRepository; use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; diff --git a/crates/worker/src/outbox_relay.rs b/crates/worker/src/outbox_relay.rs index ada92d6..d8fd0ff 100644 --- a/crates/worker/src/outbox_relay.rs +++ b/crates/worker/src/outbox_relay.rs @@ -27,25 +27,47 @@ impl OutboxRelay { } } + // NOTE: thoughts.save() and outbox.append() are not in the same DB transaction + // (known architectural limitation — fixing requires transaction-sharing between + // repositories, a larger refactor). async fn process_batch(&self) -> Result<(), sqlx::Error> { - let rows = sqlx::query_as::<_, OutboxRow>( - "SELECT seq, event_type, payload \ - FROM outbox_events \ - WHERE delivered = false \ - ORDER BY seq ASC \ - LIMIT 100 \ - FOR UPDATE SKIP LOCKED", - ) - .fetch_all(&self.pool) - .await?; + // Process one row at a time inside its own transaction so that + // FOR UPDATE SKIP LOCKED actually holds the lock for the duration + // of publish + mark_delivered. A batch SELECT without a surrounding + // transaction releases locks immediately after autocommit. + loop { + let mut tx = self.pool.begin().await?; + + let row = sqlx::query_as::<_, OutboxRow>( + "SELECT seq, event_type, payload \ + FROM outbox_events \ + WHERE delivered = false \ + ORDER BY seq ASC \ + LIMIT 1 \ + FOR UPDATE SKIP LOCKED", + ) + .fetch_optional(&mut *tx) + .await?; + + let Some(row) = row else { + tx.rollback().await?; + break; + }; - for row in rows { let payload: EventPayload = match serde_json::from_value(row.payload.clone()) { Ok(p) => p, Err(e) => { tracing::error!(seq = row.seq, event_type = row.event_type, "outbox: failed to deserialize payload: {e}"); // Mark delivered to avoid blocking; investigate manually. - self.mark_delivered(row.seq).await?; + sqlx::query( + "UPDATE outbox_events \ + SET delivered = true, delivered_at = now() \ + WHERE seq = $1", + ) + .bind(row.seq) + .execute(&mut *tx) + .await?; + tx.commit().await?; continue; } }; @@ -54,35 +76,39 @@ impl OutboxRelay { Ok(ev) => ev, Err(e) => { tracing::error!(seq = row.seq, "outbox: failed to convert to DomainEvent: {e}"); - self.mark_delivered(row.seq).await?; + sqlx::query( + "UPDATE outbox_events \ + SET delivered = true, delivered_at = now() \ + WHERE seq = $1", + ) + .bind(row.seq) + .execute(&mut *tx) + .await?; + tx.commit().await?; continue; } }; match self.publisher.publish(&domain_event).await { Ok(()) => { - self.mark_delivered(row.seq).await?; + sqlx::query( + "UPDATE outbox_events \ + SET delivered = true, delivered_at = now() \ + WHERE seq = $1", + ) + .bind(row.seq) + .execute(&mut *tx) + .await?; + tx.commit().await?; tracing::debug!(seq = row.seq, event_type = row.event_type, "outbox: delivered"); } Err(e) => { tracing::warn!(seq = row.seq, "outbox: publish failed (will retry): {e}"); - // Leave delivered=false — will be retried next poll. + tx.rollback().await?; // row stays undelivered, retried next poll } } } Ok(()) } - - async fn mark_delivered(&self, seq: i64) -> Result<(), sqlx::Error> { - sqlx::query( - "UPDATE outbox_events \ - SET delivered = true, delivered_at = now() \ - WHERE seq = $1", - ) - .bind(seq) - .execute(&self.pool) - .await?; - Ok(()) - } }