diff --git a/crates/adapters/activitypub/src/lib.rs b/crates/adapters/activitypub/src/lib.rs index a99a754..9aa10d0 100644 --- a/crates/adapters/activitypub/src/lib.rs +++ b/crates/adapters/activitypub/src/lib.rs @@ -11,3 +11,48 @@ pub use port::{ }; pub use service::ApFederationAdapter; pub use urls::ThoughtsUrls; + +use domain::ports::RemoteActorConnectionRepository; +use k_ap::ActivityPubService; +use std::sync::Arc; + +pub struct ApServiceConfig { + pub base_url: String, + pub activity_repo: Arc, + pub follow_repo: Arc, + pub actor_repo: Arc, + pub blocklist_repo: Arc, + pub user_repo: Arc, + pub ap_handler: Arc, + pub connections_repo: Arc, + pub event_publisher: Option>, + pub allow_registration: bool, + pub debug: bool, +} + +pub async fn build_ap_service( + cfg: ApServiceConfig, +) -> (Arc, Arc) { + let mut builder = ActivityPubService::builder(cfg.base_url) + .activity_repo(cfg.activity_repo) + .follow_repo(cfg.follow_repo) + .actor_repo(cfg.actor_repo) + .blocklist_repo(cfg.blocklist_repo) + .user_repo(cfg.user_repo) + .content_reader(cfg.ap_handler.clone()) + .object_handler(cfg.ap_handler) + .allow_registration(cfg.allow_registration) + .software_name("thoughts") + .debug(cfg.debug); + if let Some(publisher) = cfg.event_publisher { + builder = builder.event_publisher(publisher); + } + let raw = Arc::new( + builder + .build() + .await + .expect("Failed to build ActivityPubService"), + ); + let adapter = Arc::new(ApFederationAdapter::new(raw.clone(), cfg.connections_repo)); + (raw, adapter) +} diff --git a/crates/adapters/postgres-federation/src/lib.rs b/crates/adapters/postgres-federation/src/lib.rs index 38a50e6..77d3a46 100644 --- a/crates/adapters/postgres-federation/src/lib.rs +++ b/crates/adapters/postgres-federation/src/lib.rs @@ -1,8 +1,17 @@ -use anyhow::{anyhow, Result}; +use anyhow::Result; use async_trait::async_trait; use chrono::{DateTime, Utc}; use sqlx::PgPool; +trait IntoAnyhow { + fn into_anyhow(self) -> Result; +} +impl IntoAnyhow for std::result::Result { + fn into_anyhow(self) -> Result { + self.map_err(|e| anyhow::anyhow!(e)) + } +} + use k_ap::{ ActivityRepository, ActorRepository, ApActorType, ApUser, ApUserRepository, BlockedDomain, BlocklistRepository, FollowRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, @@ -10,11 +19,11 @@ use k_ap::{ // ── PostgresFederationRepository ───────────────────────────────────────────── -pub struct PostgresFederationRepository { +pub struct PgFederationRepository { pool: PgPool, } -impl PostgresFederationRepository { +impl PgFederationRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } @@ -71,7 +80,7 @@ fn map_remote_actor(r: RemoteActorRow) -> RemoteActor { // ── ActivityRepository ──────────────────────────────────────────────────────── #[async_trait] -impl ActivityRepository for PostgresFederationRepository { +impl ActivityRepository for PgFederationRepository { async fn is_activity_processed(&self, activity_id: &str) -> Result { let n: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM federation_processed_activities WHERE activity_id=$1", @@ -79,7 +88,7 @@ impl ActivityRepository for PostgresFederationRepository { .bind(activity_id) .fetch_one(&self.pool) .await - .map_err(|e| anyhow!(e))?; + .into_anyhow()?; Ok(n > 0) } @@ -90,7 +99,7 @@ impl ActivityRepository for PostgresFederationRepository { .bind(activity_id) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } } @@ -98,7 +107,7 @@ impl ActivityRepository for PostgresFederationRepository { // ── FollowRepository ────────────────────────────────────────────────────────── #[async_trait] -impl FollowRepository for PostgresFederationRepository { +impl FollowRepository for PgFederationRepository { async fn add_follower( &self, local_user_id: uuid::Uuid, @@ -118,7 +127,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(follow_activity_id) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } @@ -134,7 +143,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(remote_actor_url) .fetch_optional(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() } async fn remove_follower( @@ -149,7 +158,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(remote_actor_url) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } @@ -172,7 +181,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(local_user_id) .fetch_all(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|rows| { rows.into_iter() .map(|r| Follower { @@ -210,7 +219,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(offset as i64) .fetch_all(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|rows| { rows.into_iter() .map(|r| Follower { @@ -227,7 +236,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(local_user_id) .fetch_one(&self.pool) .await - .map_err(|e| anyhow!(e))?; + .into_anyhow()?; Ok(n as usize) } @@ -238,7 +247,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(local_user_id) .fetch_one(&self.pool) .await - .map_err(|e| anyhow!(e))?; + .into_anyhow()?; Ok(n as usize) } @@ -262,7 +271,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(offset as i64) .fetch_all(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|rows| rows.into_iter().map(map_remote_actor).collect()) } @@ -287,7 +296,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(local_user_id) .fetch_all(&self.pool) .await - .map_err(|e| anyhow!(e))?; + .into_anyhow()?; Ok(rows) } @@ -303,7 +312,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(local_user_id) .fetch_all(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|rows| rows.into_iter().map(map_remote_actor).collect()) } @@ -321,7 +330,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(status_str(&status)) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } @@ -344,7 +353,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(&actor.outbox_url) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } @@ -360,7 +369,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(remote_actor_url) .fetch_optional(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() } async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { @@ -371,7 +380,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(actor_url) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } @@ -387,7 +396,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(local_user_id) .fetch_all(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|rows| rows.into_iter().map(map_remote_actor).collect()) } @@ -411,7 +420,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(offset as i64) .fetch_all(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|rows| rows.into_iter().map(map_remote_actor).collect()) } @@ -421,7 +430,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(local_user_id) .fetch_one(&self.pool) .await - .map_err(|e| anyhow!(e))?; + .into_anyhow()?; Ok(n as usize) } @@ -443,7 +452,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(s) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } @@ -459,7 +468,7 @@ impl FollowRepository for PostgresFederationRepository { .bind(remote_actor_url) .fetch_optional(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() } async fn migrate_follower_actor( @@ -467,7 +476,7 @@ impl FollowRepository for PostgresFederationRepository { old_actor_url: &str, new_actor_url: &str, ) -> Result> { - let mut tx = self.pool.begin().await.map_err(|e| anyhow!(e))?; + let mut tx = self.pool.begin().await.into_anyhow()?; let affected: Vec = sqlx::query_scalar( "INSERT INTO federation_following(local_user_id, remote_actor_url, follow_activity_id, outbox_url) @@ -481,15 +490,15 @@ impl FollowRepository for PostgresFederationRepository { .bind(new_actor_url) .fetch_all(&mut *tx) .await - .map_err(|e| anyhow!(e))?; + .into_anyhow()?; sqlx::query("DELETE FROM federation_following WHERE remote_actor_url = $1") .bind(old_actor_url) .execute(&mut *tx) .await - .map_err(|e| anyhow!(e))?; + .into_anyhow()?; - tx.commit().await.map_err(|e| anyhow!(e))?; + tx.commit().await.into_anyhow()?; Ok(affected) } } @@ -497,7 +506,7 @@ impl FollowRepository for PostgresFederationRepository { // ── ActorRepository ─────────────────────────────────────────────────────────── #[async_trait] -impl ActorRepository for PostgresFederationRepository { +impl ActorRepository for PgFederationRepository { async fn get_local_actor_keypair( &self, user_id: uuid::Uuid, @@ -513,7 +522,7 @@ impl ActorRepository for PostgresFederationRepository { .bind(user_id) .fetch_optional(&self.pool) .await - .map_err(|e| anyhow!(e))?; + .into_anyhow()?; Ok(row.and_then(|r| match (r.public_key, r.private_key) { (Some(pub_k), Some(priv_k)) => Some((pub_k, priv_k)), _ => None, @@ -532,7 +541,7 @@ impl ActorRepository for PostgresFederationRepository { .bind(&private_key) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } @@ -568,7 +577,7 @@ impl ActorRepository for PostgresFederationRepository { .bind(also_known_as.as_deref()) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } @@ -581,7 +590,7 @@ impl ActorRepository for PostgresFederationRepository { .bind(actor_url) .fetch_optional(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|o| o.map(map_remote_actor)) } @@ -602,7 +611,7 @@ impl ActorRepository for PostgresFederationRepository { .bind(announced_at) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } @@ -612,7 +621,7 @@ impl ActorRepository for PostgresFederationRepository { .bind(actor_url) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } @@ -622,7 +631,7 @@ impl ActorRepository for PostgresFederationRepository { .bind(object_url) .fetch_one(&self.pool) .await - .map_err(|e| anyhow!(e))?; + .into_anyhow()?; Ok(n as usize) } } @@ -630,7 +639,7 @@ impl ActorRepository for PostgresFederationRepository { // ── BlocklistRepository ─────────────────────────────────────────────────────── #[async_trait] -impl BlocklistRepository for PostgresFederationRepository { +impl BlocklistRepository for PgFederationRepository { async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> Result<()> { sqlx::query( "INSERT INTO federation_blocked_domains(domain,reason) VALUES($1,$2) ON CONFLICT(domain) DO NOTHING", @@ -639,7 +648,7 @@ impl BlocklistRepository for PostgresFederationRepository { .bind(reason) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } @@ -648,7 +657,7 @@ impl BlocklistRepository for PostgresFederationRepository { .bind(domain) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } @@ -664,7 +673,7 @@ impl BlocklistRepository for PostgresFederationRepository { ) .fetch_all(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|rows| { rows.into_iter() .map(|r| BlockedDomain { @@ -682,7 +691,7 @@ impl BlocklistRepository for PostgresFederationRepository { .bind(domain) .fetch_one(&self.pool) .await - .map_err(|e| anyhow!(e))?; + .into_anyhow()?; Ok(n > 0) } @@ -694,7 +703,7 @@ impl BlocklistRepository for PostgresFederationRepository { .bind(actor_url) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } @@ -704,7 +713,7 @@ impl BlocklistRepository for PostgresFederationRepository { .bind(actor_url) .execute(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() .map(|_| ()) } @@ -715,7 +724,7 @@ impl BlocklistRepository for PostgresFederationRepository { .bind(local_user_id) .fetch_all(&self.pool) .await - .map_err(|e| anyhow!(e)) + .into_anyhow() } async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result { @@ -726,7 +735,7 @@ impl BlocklistRepository for PostgresFederationRepository { .bind(actor_url) .fetch_one(&self.pool) .await - .map_err(|e| anyhow!(e))?; + .into_anyhow()?; Ok(n > 0) } } @@ -744,12 +753,12 @@ struct UserRow { also_known_as: Option, } -pub struct PostgresApUserRepository { +pub struct PgApUserRepository { pool: PgPool, base_url: String, } -impl PostgresApUserRepository { +impl PgApUserRepository { pub fn new(pool: PgPool, base_url: String) -> Self { Self { pool, base_url } } @@ -777,7 +786,7 @@ impl PostgresApUserRepository { } #[async_trait] -impl ApUserRepository for PostgresApUserRepository { +impl ApUserRepository for PgApUserRepository { async fn find_by_id(&self, id: uuid::Uuid) -> Result> { let row = sqlx::query_as::<_, UserRow>( "SELECT id,username,display_name,bio,avatar_url,header_url,also_known_as FROM users WHERE id=$1 AND local=true", @@ -785,7 +794,7 @@ impl ApUserRepository for PostgresApUserRepository { .bind(id) .fetch_optional(&self.pool) .await - .map_err(|e| anyhow!(e))?; + .into_anyhow()?; Ok(row.map(|r| self.row_to_ap_user(r))) } @@ -796,7 +805,7 @@ impl ApUserRepository for PostgresApUserRepository { .bind(username) .fetch_optional(&self.pool) .await - .map_err(|e| anyhow!(e))?; + .into_anyhow()?; Ok(row.map(|r| self.row_to_ap_user(r))) } @@ -804,7 +813,7 @@ impl ApUserRepository for PostgresApUserRepository { let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users WHERE local=true") .fetch_one(&self.pool) .await - .map_err(|e| anyhow!(e))?; + .into_anyhow()?; Ok(n as usize) } } diff --git a/crates/adapters/postgres/src/activitypub/mod.rs b/crates/adapters/postgres/src/activitypub/mod.rs index 111aaf1..c783b69 100644 --- a/crates/adapters/postgres/src/activitypub/mod.rs +++ b/crates/adapters/postgres/src/activitypub/mod.rs @@ -13,6 +13,40 @@ use domain::{ value_objects::{Content, ThoughtId, UserId, Username}, }; +#[derive(sqlx::FromRow)] +struct OutboxRow { + id: uuid::Uuid, + user_id: uuid::Uuid, + content: String, + created_at: DateTime, + in_reply_to_id: Option, + content_warning: Option, + sensitive: bool, + username: String, + updated_at: Option>, +} + +impl OutboxRow { + fn into_entry(self) -> OutboxEntry { + OutboxEntry { + thought: Thought { + id: ThoughtId::from_uuid(self.id), + user_id: UserId::from_uuid(self.user_id), + content: Content::new_remote(self.content), + in_reply_to_id: self.in_reply_to_id.map(ThoughtId::from_uuid), + visibility: Visibility::Public, + content_warning: self.content_warning, + sensitive: self.sensitive, + local: true, + created_at: self.created_at, + updated_at: self.updated_at, + note_extensions: None, + }, + author_username: Username::from_trusted(self.username), + } + } +} + pub struct PgActivityPubRepository { pool: PgPool, } @@ -29,19 +63,7 @@ impl ActivityPubRepository for PgActivityPubRepository { &self, user_id: &UserId, ) -> Result, DomainError> { - #[derive(sqlx::FromRow)] - struct Row { - id: uuid::Uuid, - user_id: uuid::Uuid, - content: String, - created_at: DateTime, - in_reply_to_id: Option, - content_warning: Option, - sensitive: bool, - username: String, - updated_at: Option>, - } - sqlx::query_as::<_, Row>( + sqlx::query_as::<_, OutboxRow>( "SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at FROM thoughts t JOIN users u ON u.id=t.user_id WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' @@ -51,26 +73,7 @@ impl ActivityPubRepository for PgActivityPubRepository { .fetch_all(&self.pool) .await .into_domain() - .map(|rows| { - rows.into_iter() - .map(|r| OutboxEntry { - thought: Thought { - id: ThoughtId::from_uuid(r.id), - user_id: UserId::from_uuid(r.user_id), - content: Content::new_remote(r.content), - in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid), - visibility: Visibility::Public, - content_warning: r.content_warning, - sensitive: r.sensitive, - local: true, - created_at: r.created_at, - updated_at: r.updated_at, - note_extensions: None, - }, - author_username: Username::from_trusted(r.username), - }) - .collect() - }) + .map(|rows| rows.into_iter().map(OutboxRow::into_entry).collect()) } async fn outbox_page_for_actor( @@ -79,20 +82,8 @@ impl ActivityPubRepository for PgActivityPubRepository { before: Option>, limit: usize, ) -> Result, DomainError> { - #[derive(sqlx::FromRow)] - struct Row { - id: uuid::Uuid, - user_id: uuid::Uuid, - content: String, - created_at: DateTime, - in_reply_to_id: Option, - content_warning: Option, - sensitive: bool, - username: String, - updated_at: Option>, - } let rows = if let Some(before) = before { - sqlx::query_as::<_, Row>( + sqlx::query_as::<_, OutboxRow>( "SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at FROM thoughts t JOIN users u ON u.id=t.user_id WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' AND t.created_at < $2 @@ -104,7 +95,7 @@ impl ActivityPubRepository for PgActivityPubRepository { .fetch_all(&self.pool) .await } else { - sqlx::query_as::<_, Row>( + sqlx::query_as::<_, OutboxRow>( "SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at FROM thoughts t JOIN users u ON u.id=t.user_id WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' @@ -117,25 +108,7 @@ impl ActivityPubRepository for PgActivityPubRepository { } .into_domain()?; - Ok(rows - .into_iter() - .map(|r| OutboxEntry { - thought: Thought { - id: ThoughtId::from_uuid(r.id), - user_id: UserId::from_uuid(r.user_id), - content: Content::new_remote(r.content), - in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid), - visibility: Visibility::Public, - content_warning: r.content_warning, - sensitive: r.sensitive, - local: true, - created_at: r.created_at, - updated_at: r.updated_at, - note_extensions: None, - }, - author_username: Username::from_trusted(r.username), - }) - .collect()) + Ok(rows.into_iter().map(OutboxRow::into_entry).collect()) } async fn find_remote_actor_id( diff --git a/crates/adapters/postgres/src/api_key/mod.rs b/crates/adapters/postgres/src/api_key/mod.rs index e9e0b87..5f872bd 100644 --- a/crates/adapters/postgres/src/api_key/mod.rs +++ b/crates/adapters/postgres/src/api_key/mod.rs @@ -9,6 +9,27 @@ use domain::{ }; use sqlx::PgPool; +#[derive(sqlx::FromRow)] +struct ApiKeyRow { + id: uuid::Uuid, + user_id: uuid::Uuid, + key_hash: String, + name: String, + created_at: DateTime, +} + +impl ApiKeyRow { + fn into_domain(self) -> ApiKey { + ApiKey { + id: ApiKeyId::from_uuid(self.id), + user_id: UserId::from_uuid(self.user_id), + key_hash: self.key_hash, + name: self.name, + created_at: self.created_at, + } + } +} + pub struct PgApiKeyRepository { pool: PgPool, } @@ -36,45 +57,21 @@ impl ApiKeyRepository for PgApiKeyRepository { } async fn find_by_hash(&self, hash: &str) -> Result, DomainError> { - #[derive(sqlx::FromRow)] - struct Row { - id: uuid::Uuid, - user_id: uuid::Uuid, - key_hash: String, - name: String, - created_at: DateTime, - } - sqlx::query_as::<_, Row>( + sqlx::query_as::<_, ApiKeyRow>( "SELECT id,user_id,key_hash,name,created_at FROM api_keys WHERE key_hash=$1", ) .bind(hash) .fetch_optional(&self.pool) .await .into_domain() - .map(|o| { - o.map(|r| ApiKey { - id: ApiKeyId::from_uuid(r.id), - user_id: UserId::from_uuid(r.user_id), - key_hash: r.key_hash, - name: r.name, - created_at: r.created_at, - }) - }) + .map(|o| o.map(ApiKeyRow::into_domain)) } async fn list_for_user(&self, user_id: &UserId) -> Result, DomainError> { - #[derive(sqlx::FromRow)] - struct Row { - id: uuid::Uuid, - user_id: uuid::Uuid, - key_hash: String, - name: String, - created_at: DateTime, - } - sqlx::query_as::<_, Row>("SELECT id,user_id,key_hash,name,created_at FROM api_keys WHERE user_id=$1 ORDER BY created_at DESC") + sqlx::query_as::<_, ApiKeyRow>("SELECT id,user_id,key_hash,name,created_at FROM api_keys WHERE user_id=$1 ORDER BY created_at DESC") .bind(user_id.as_uuid()).fetch_all(&self.pool).await .into_domain() - .map(|rows| rows.into_iter().map(|r| ApiKey { id: ApiKeyId::from_uuid(r.id), user_id: UserId::from_uuid(r.user_id), key_hash: r.key_hash, name: r.name, created_at: r.created_at }).collect()) + .map(|rows| rows.into_iter().map(ApiKeyRow::into_domain).collect()) } async fn delete(&self, id: &ApiKeyId, user_id: &UserId) -> Result<(), DomainError> { diff --git a/crates/adapters/postgres/src/constants.rs b/crates/adapters/postgres/src/constants.rs new file mode 100644 index 0000000..a0fd1f7 --- /dev/null +++ b/crates/adapters/postgres/src/constants.rs @@ -0,0 +1,8 @@ +pub const STATUS_ACCEPTED: &str = "accepted"; +pub const STATUS_PENDING: &str = "pending"; +pub const STATUS_REJECTED: &str = "rejected"; + +pub const VIS_PUBLIC: &str = "public"; +pub const VIS_UNLISTED: &str = "unlisted"; +pub const VIS_FOLLOWERS: &str = "followers"; +pub const VIS_DIRECT: &str = "direct"; diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index 2d86fdb..28087e9 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -2,6 +2,7 @@ pub mod activitypub; pub mod api_key; pub mod block; pub mod boost; +pub mod constants; mod db_error; pub mod engagement; pub mod failed_event; diff --git a/crates/adapters/postgres/src/tag/mod.rs b/crates/adapters/postgres/src/tag/mod.rs index e93a8d2..2dfaa8f 100644 --- a/crates/adapters/postgres/src/tag/mod.rs +++ b/crates/adapters/postgres/src/tag/mod.rs @@ -12,6 +12,12 @@ use domain::{ }; use sqlx::PgPool; +#[derive(sqlx::FromRow)] +struct TagRow { + id: i32, + name: String, +} + pub struct PgTagRepository { pool: PgPool, } @@ -30,12 +36,7 @@ impl TagRepository for PgTagRepository { .execute(&self.pool) .await .into_domain()?; - #[derive(sqlx::FromRow)] - struct Row { - id: i32, - name: String, - } - let row = sqlx::query_as::<_, Row>("SELECT id,name FROM tags WHERE name=$1") + let row = sqlx::query_as::<_, TagRow>("SELECT id,name FROM tags WHERE name=$1") .bind(&name) .fetch_one(&self.pool) .await @@ -72,12 +73,7 @@ impl TagRepository for PgTagRepository { } async fn list_for_thought(&self, thought_id: &ThoughtId) -> Result, DomainError> { - #[derive(sqlx::FromRow)] - struct Row { - id: i32, - name: String, - } - sqlx::query_as::<_, Row>( + sqlx::query_as::<_, TagRow>( "SELECT t.id,t.name FROM tags t JOIN thought_tags tt ON tt.tag_id=t.id WHERE tt.thought_id=$1" ).bind(thought_id.as_uuid()).fetch_all(&self.pool).await .into_domain() diff --git a/crates/adapters/postgres/src/top_friend/mod.rs b/crates/adapters/postgres/src/top_friend/mod.rs index 769553a..313fd1e 100644 --- a/crates/adapters/postgres/src/top_friend/mod.rs +++ b/crates/adapters/postgres/src/top_friend/mod.rs @@ -44,24 +44,14 @@ impl TopFriendRepository for PgTopFriendRepository { async fn list_for_user(&self, user_id: &UserId) -> Result, DomainError> { #[derive(sqlx::FromRow)] - struct Row { + struct TopFriendRow { tf_user_id: uuid::Uuid, friend_id: uuid::Uuid, position: i16, - id: uuid::Uuid, - username: String, - email: String, - password_hash: String, - display_name: Option, - bio: Option, - avatar_url: Option, - header_url: Option, - custom_css: Option, - local: bool, - created_at: chrono::DateTime, - updated_at: chrono::DateTime, + #[sqlx(flatten)] + user: crate::user::UserRow, } - let rows = sqlx::query_as::<_, Row>( + let rows = sqlx::query_as::<_, TopFriendRow>( "SELECT tf.user_id AS tf_user_id, tf.friend_id, tf.position, u.id, u.username, u.email, u.password_hash, u.display_name, u.bio, u.avatar_url, u.header_url, u.custom_css, u.local, @@ -77,27 +67,12 @@ impl TopFriendRepository for PgTopFriendRepository { Ok(rows .into_iter() .map(|r| { - use domain::value_objects::{Email, PasswordHash, Username}; let tf = TopFriend { user_id: UserId::from_uuid(r.tf_user_id), friend_id: UserId::from_uuid(r.friend_id), position: r.position, }; - let u = User { - id: UserId::from_uuid(r.id), - username: Username::from_trusted(r.username), - email: Email::from_trusted(r.email), - password_hash: PasswordHash(r.password_hash), - display_name: r.display_name, - bio: r.bio, - avatar_url: r.avatar_url, - header_url: r.header_url, - custom_css: r.custom_css, - local: r.local, - created_at: r.created_at, - updated_at: r.updated_at, - }; - (tf, u) + (tf, User::from(r.user)) }) .collect()) } diff --git a/crates/api-types/src/responses.rs b/crates/api-types/src/responses.rs index f145c3f..db64a95 100644 --- a/crates/api-types/src/responses.rs +++ b/crates/api-types/src/responses.rs @@ -83,6 +83,13 @@ pub struct TopFriendsResponse { pub top_friends: Vec, } +#[derive(Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct NotificationSummaryResponse { + pub total: i64, + pub unread: u64, +} + #[derive(Serialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct ErrorResponse { diff --git a/crates/application/src/services/federation_event/mod.rs b/crates/application/src/services/federation_event/mod.rs index 0f7feca..4383ea6 100644 --- a/crates/application/src/services/federation_event/mod.rs +++ b/crates/application/src/services/federation_event/mod.rs @@ -8,6 +8,10 @@ use domain::{ }; use std::sync::Arc; +fn should_broadcast(t: &domain::models::thought::Thought) -> bool { + t.local && matches!(t.visibility, Visibility::Public | Visibility::Unlisted) +} + pub struct FederationEventService { pub thoughts: Arc, pub users: Arc, @@ -32,15 +36,7 @@ impl FederationEventService { .. } => { let thought = match self.thoughts.find_by_id(thought_id).await? { - Some(t) - if t.local - && matches!( - t.visibility, - Visibility::Public | Visibility::Unlisted - ) => - { - t - } + Some(t) if should_broadcast(&t) => t, _ => { tracing::debug!(thought_id = %thought_id, "federation: skipping ThoughtCreated (remote or non-public)"); return Ok(()); @@ -86,15 +82,7 @@ impl FederationEventService { user_id, } => { let thought = match self.thoughts.find_by_id(thought_id).await? { - Some(t) - if t.local - && matches!( - t.visibility, - Visibility::Public | Visibility::Unlisted - ) => - { - t - } + Some(t) if should_broadcast(&t) => t, _ => return Ok(()), }; let user = match self.users.find_by_id(user_id).await? { @@ -125,14 +113,10 @@ impl FederationEventService { user_id, thought_id, } => { - let booster = match self.users.find_by_id(user_id).await? { - Some(u) if u.local => u, - _ => { - tracing::debug!(user_id = %user_id, "federation: skipping BoostAdded (remote user)"); - return Ok(()); - } - }; - let _ = booster; + if !matches!(self.users.find_by_id(user_id).await?, Some(u) if u.local) { + tracing::debug!(user_id = %user_id, "federation: skipping BoostAdded (remote user)"); + return Ok(()); + } if self.thoughts.find_by_id(thought_id).await?.is_none() { return Ok(()); } @@ -160,14 +144,10 @@ impl FederationEventService { user_id, thought_id, } => { - let liker = match self.users.find_by_id(user_id).await? { - Some(u) if u.local => u, - _ => { - tracing::debug!(user_id = %user_id, "federation: skipping LikeAdded (remote user)"); - return Ok(()); - } - }; - let _ = liker; + if !matches!(self.users.find_by_id(user_id).await?, Some(u) if u.local) { + tracing::debug!(user_id = %user_id, "federation: skipping LikeAdded (remote user)"); + return Ok(()); + } let thought = match self.thoughts.find_by_id(thought_id).await? { Some(t) => t, _ => return Ok(()), @@ -193,11 +173,9 @@ impl FederationEventService { user_id, thought_id, } => { - let liker = match self.users.find_by_id(user_id).await? { - Some(u) if u.local => u, - _ => return Ok(()), - }; - let _ = liker; + if !matches!(self.users.find_by_id(user_id).await?, Some(u) if u.local) { + return Ok(()); + } let thought = match self.thoughts.find_by_id(thought_id).await? { Some(t) => t, _ => return Ok(()), diff --git a/crates/application/src/use_cases/profile/mod.rs b/crates/application/src/use_cases/profile/mod.rs index 30c9e15..b742a79 100644 --- a/crates/application/src/use_cases/profile/mod.rs +++ b/crates/application/src/use_cases/profile/mod.rs @@ -118,17 +118,25 @@ fn mime_to_ext(mime: &str) -> Result<&'static str, DomainError> { } } -#[allow(clippy::too_many_arguments)] +pub struct UploadContext<'a> { + pub users: &'a dyn UserRepository, + pub media: &'a dyn MediaStore, + pub events: &'a dyn EventPublisher, + pub upload_config: &'a UploadConfig, + pub base_url: &'a str, +} + async fn store_image( - media: &dyn MediaStore, - base_url: &str, - cfg: &UploadConfig, + ctx: &UploadContext<'_>, content_type: &str, data: Bytes, user_id: &UserId, key_segment: &str, old_url: Option<&str>, ) -> Result { + let cfg = ctx.upload_config; + let media = ctx.media; + let base_url = ctx.base_url; if !cfg.allowed_content_types.iter().any(|t| t == content_type) { return Err(DomainError::InvalidInput("unsupported content type".into())); } @@ -148,25 +156,19 @@ async fn store_image( Ok(key) } -#[allow(clippy::too_many_arguments)] pub async fn upload_avatar( - users: &dyn UserRepository, - media: &dyn MediaStore, - events: &dyn EventPublisher, + ctx: &UploadContext<'_>, user_id: &UserId, - base_url: &str, - cfg: &UploadConfig, content_type: &str, data: Bytes, ) -> Result<(), DomainError> { - let current = users + let current = ctx + .users .find_by_id(user_id) .await? .ok_or(DomainError::NotFound)?; let key = store_image( - media, - base_url, - cfg, + ctx, content_type, data, user_id, @@ -174,41 +176,35 @@ pub async fn upload_avatar( current.avatar_url.as_deref(), ) .await?; - users + ctx.users .update_profile( user_id, UpdateProfileInput { - avatar_url: Some(format!("{base_url}/media/{key}")), + avatar_url: Some(format!("{}/media/{key}", ctx.base_url)), ..Default::default() }, ) .await?; - events + ctx.events .publish(&DomainEvent::ProfileUpdated { user_id: user_id.clone(), }) .await } -#[allow(clippy::too_many_arguments)] pub async fn upload_banner( - users: &dyn UserRepository, - media: &dyn MediaStore, - events: &dyn EventPublisher, + ctx: &UploadContext<'_>, user_id: &UserId, - base_url: &str, - cfg: &UploadConfig, content_type: &str, data: Bytes, ) -> Result<(), DomainError> { - let current = users + let current = ctx + .users .find_by_id(user_id) .await? .ok_or(DomainError::NotFound)?; let key = store_image( - media, - base_url, - cfg, + ctx, content_type, data, user_id, @@ -216,16 +212,16 @@ pub async fn upload_banner( current.header_url.as_deref(), ) .await?; - users + ctx.users .update_profile( user_id, UpdateProfileInput { - header_url: Some(format!("{base_url}/media/{key}")), + header_url: Some(format!("{}/media/{key}", ctx.base_url)), ..Default::default() }, ) .await?; - events + ctx.events .publish(&DomainEvent::ProfileUpdated { user_id: user_id.clone(), }) diff --git a/crates/application/src/use_cases/profile/tests.rs b/crates/application/src/use_cases/profile/tests.rs index 913fefb..2d6f855 100644 --- a/crates/application/src/use_cases/profile/tests.rs +++ b/crates/application/src/use_cases/profile/tests.rs @@ -113,24 +113,31 @@ fn default_cfg() -> UploadConfig { UploadConfig::default() } +fn make_ctx<'a>( + store: &'a TestStore, + media: &'a MockMedia, + cfg: &'a UploadConfig, +) -> UploadContext<'a> { + UploadContext { + users: store, + media, + events: store, + upload_config: cfg, + base_url: "http://localhost", + } +} + #[tokio::test] async fn upload_avatar_rejects_unsupported_mime() { let store = TestStore::default(); let media = MockMedia::default(); let user = make_user(); store.users.lock().unwrap().push(user.clone()); - let err = upload_avatar( - &store, - &media, - &store, - &user.id, - "http://localhost", - &default_cfg(), - "text/plain", - Bytes::from("hi"), - ) - .await - .unwrap_err(); + let cfg = default_cfg(); + let ctx = make_ctx(&store, &media, &cfg); + let err = upload_avatar(&ctx, &user.id, "text/plain", Bytes::from("hi")) + .await + .unwrap_err(); assert!(matches!(err, DomainError::InvalidInput(_))); } @@ -141,18 +148,11 @@ async fn upload_avatar_rejects_oversized_data() { let user = make_user(); store.users.lock().unwrap().push(user.clone()); let big = Bytes::from(vec![0u8; 6 * 1024 * 1024]); - let err = upload_avatar( - &store, - &media, - &store, - &user.id, - "http://localhost", - &default_cfg(), - "image/jpeg", - big, - ) - .await - .unwrap_err(); + let cfg = default_cfg(); + let ctx = make_ctx(&store, &media, &cfg); + let err = upload_avatar(&ctx, &user.id, "image/jpeg", big) + .await + .unwrap_err(); assert!(matches!(err, DomainError::InvalidInput(_))); } @@ -162,18 +162,11 @@ async fn upload_avatar_stores_file_and_updates_url() { let media = MockMedia::default(); let user = make_user(); store.users.lock().unwrap().push(user.clone()); - upload_avatar( - &store, - &media, - &store, - &user.id, - "http://localhost", - &default_cfg(), - "image/jpeg", - Bytes::from("img"), - ) - .await - .unwrap(); + let cfg = default_cfg(); + let ctx = make_ctx(&store, &media, &cfg); + upload_avatar(&ctx, &user.id, "image/jpeg", Bytes::from("img")) + .await + .unwrap(); let key = format!("users/{}/avatar.jpg", user.id.as_uuid()); assert!(media.store.lock().unwrap().contains_key(&key)); let saved = store @@ -203,18 +196,11 @@ async fn upload_avatar_deletes_old_file_on_reupload() { .lock() .unwrap() .insert(old_key.clone(), Bytes::from("old")); - upload_avatar( - &store, - &media, - &store, - &user.id, - "http://localhost", - &default_cfg(), - "image/jpeg", - Bytes::from("new"), - ) - .await - .unwrap(); + let cfg = default_cfg(); + let ctx = make_ctx(&store, &media, &cfg); + upload_avatar(&ctx, &user.id, "image/jpeg", Bytes::from("new")) + .await + .unwrap(); assert!(!media.store.lock().unwrap().contains_key(&old_key)); assert!(media.deleted.lock().unwrap().contains(&old_key)); } @@ -225,18 +211,11 @@ async fn upload_banner_stores_file_and_updates_header_url() { let media = MockMedia::default(); let user = make_user(); store.users.lock().unwrap().push(user.clone()); - upload_banner( - &store, - &media, - &store, - &user.id, - "http://localhost", - &default_cfg(), - "image/png", - Bytes::from("banner"), - ) - .await - .unwrap(); + let cfg = default_cfg(); + let ctx = make_ctx(&store, &media, &cfg); + upload_banner(&ctx, &user.id, "image/png", Bytes::from("banner")) + .await + .unwrap(); let key = format!("users/{}/banner.png", user.id.as_uuid()); assert!(media.store.lock().unwrap().contains_key(&key)); let saved = store @@ -266,18 +245,11 @@ async fn upload_banner_deletes_old_file_on_reupload() { .lock() .unwrap() .insert(old_key.clone(), Bytes::from("old")); - upload_banner( - &store, - &media, - &store, - &user.id, - "http://localhost", - &default_cfg(), - "image/png", - Bytes::from("new"), - ) - .await - .unwrap(); + let cfg = default_cfg(); + let ctx = make_ctx(&store, &media, &cfg); + upload_banner(&ctx, &user.id, "image/png", Bytes::from("new")) + .await + .unwrap(); assert!(!media.store.lock().unwrap().contains_key(&old_key)); assert!(media.deleted.lock().unwrap().contains(&old_key)); } diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index 1291679..35e7085 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use application::use_cases::profile::UploadConfig; use storage::{build_store, ObjectStorageAdapter, StorageConfig}; -use activitypub::{ApFederationAdapter, ThoughtsObjectHandler}; +use activitypub::{build_ap_service, ApFederationAdapter, ApServiceConfig, ThoughtsObjectHandler}; use auth::ApiKeyServiceImpl; use domain::{ errors::DomainError, @@ -16,13 +16,13 @@ use domain::{ ports::{EventPublisher, OutboxWriter}, }; use event_transport::{EventPublisherAdapter, Transport}; -use k_ap::{ActivityPubService, FederationEvent}; +use k_ap::FederationEvent; use nats::NatsTransport; use postgres::activitypub::PgActivityPubRepository; use postgres::engagement::PgEngagementRepository; use postgres::outbox::PgOutboxWriter; use postgres::remote_actor_connections::PgRemoteActorConnectionRepository; -use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; +use postgres_federation::{PgApUserRepository, PgFederationRepository}; use presentation::state::AppState; use crate::config::Config; @@ -125,7 +125,7 @@ pub async fn build(cfg: &Config) -> Infrastructure { // 3. ActivityPub federation let connections_repo = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())); - let fed_repo = Arc::new(PostgresFederationRepository::new(pool.clone())); + let fed_repo = Arc::new(PgFederationRepository::new(pool.clone())); let likes: Arc = Arc::new(postgres::like::PgLikeRepository::new(pool.clone())); let boosts: Arc = @@ -138,30 +138,20 @@ pub async fn build(cfg: &Config) -> Infrastructure { likes.clone(), boosts.clone(), )); - let mut ap_builder = ActivityPubService::builder(cfg.base_url.clone()) - .activity_repo(fed_repo.clone()) - .follow_repo(fed_repo.clone()) - .actor_repo(fed_repo.clone()) - .blocklist_repo(fed_repo.clone()) - .user_repo(Arc::new(PostgresApUserRepository::new( - pool.clone(), - cfg.base_url.clone(), - ))) - .content_reader(ap_handler.clone()) - .object_handler(ap_handler) - .allow_registration(cfg.allow_registration) - .software_name("thoughts") - .debug(cfg.debug); - if let Some(publisher) = kap_publisher { - ap_builder = ap_builder.event_publisher(publisher); - } - let raw_ap_service = Arc::new( - ap_builder - .build() - .await - .expect("Failed to build ActivityPubService"), - ); - let ap_service = Arc::new(ApFederationAdapter::new(raw_ap_service, connections_repo)); + let (_raw, ap_service) = build_ap_service(ApServiceConfig { + base_url: cfg.base_url.clone(), + activity_repo: fed_repo.clone(), + follow_repo: fed_repo.clone(), + actor_repo: fed_repo.clone(), + blocklist_repo: fed_repo.clone(), + user_repo: Arc::new(PgApUserRepository::new(pool.clone(), cfg.base_url.clone())), + ap_handler, + connections_repo, + event_publisher: kap_publisher, + allow_registration: cfg.allow_registration, + debug: cfg.debug, + }) + .await; // 4. Storage adapter let storage_cfg = StorageConfig { diff --git a/crates/domain/src/value_objects/mod.rs b/crates/domain/src/value_objects/mod.rs index 04129fb..9b008ca 100644 --- a/crates/domain/src/value_objects/mod.rs +++ b/crates/domain/src/value_objects/mod.rs @@ -89,7 +89,7 @@ impl Email { } } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct PasswordHash(pub String); #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] diff --git a/crates/presentation/src/handlers/api_keys.rs b/crates/presentation/src/handlers/api_keys.rs index de14777..622828c 100644 --- a/crates/presentation/src/handlers/api_keys.rs +++ b/crates/presentation/src/handlers/api_keys.rs @@ -37,11 +37,13 @@ pub async fn post_api_key( Deps(d): Deps, AuthUser(uid): AuthUser, Json(body): Json, -) -> Result, ApiError> { +) -> Result, ApiError> { let (key, raw) = create_api_key(&*d.api_keys, &uid, body.name).await?; - Ok(Json( - serde_json::json!({ "id": key.id.as_uuid(), "name": key.name, "key": raw }), - )) + Ok(Json(CreatedApiKeyResponse { + id: key.id.as_uuid(), + name: key.name, + key: raw, + })) } #[utoipa::path(delete, path = "/api-keys/{id}", params(("id" = uuid::Uuid, Path, description = "Key ID")), responses((status = 204, description = "Deleted")), security(("bearer_auth" = [])))] pub async fn delete_api_key_handler( diff --git a/crates/presentation/src/handlers/feed.rs b/crates/presentation/src/handlers/feed.rs index 63d832b..afd53e3 100644 --- a/crates/presentation/src/handlers/feed.rs +++ b/crates/presentation/src/handlers/feed.rs @@ -5,7 +5,7 @@ use crate::{ handlers::auth::to_user_response, }; use api_types::requests::{PaginationQuery, SearchQuery}; -use api_types::responses::ThoughtResponse; +use api_types::responses::{PagedResponse, ThoughtResponse}; use application::use_cases::feed::{ get_home_feed, get_popular_tags as uc_get_popular_tags, get_public_feed, get_tag_feed, get_user_feed, @@ -75,6 +75,13 @@ impl TryFrom for FeedOptions { } } +fn wants_activity_json(headers: &HeaderMap) -> bool { + headers + .get(header::ACCEPT) + .and_then(|v| v.to_str().ok()) + .is_some_and(|a| a.contains("application/activity+json")) +} + deps_struct!(FeedDeps { feed: FeedRepository, follows: FollowRepository, @@ -116,19 +123,19 @@ pub async fn home_feed( AuthUser(uid): AuthUser, Query(q): Query, Query(opts_q): Query, -) -> Result, ApiError> { +) -> Result>, ApiError> { let page = PageParams { page: q.page(), per_page: q.per_page(), }; let opts = FeedOptions::try_from(opts_q)?; let result = get_home_feed(&*d.feed, &*d.follows, &uid, page, opts).await?; - Ok(Json(serde_json::json!({ - "items": result.items.iter().map(to_thought_response).collect::>(), - "total": result.total, - "page": result.page, - "per_page": result.per_page, - }))) + Ok(Json(PagedResponse { + items: result.items.iter().map(to_thought_response).collect(), + total: result.total, + page: result.page, + per_page: result.per_page, + })) } #[utoipa::path( @@ -141,19 +148,19 @@ pub async fn public_feed( OptionalAuthUser(viewer): OptionalAuthUser, Query(q): Query, Query(opts_q): Query, -) -> Result, ApiError> { +) -> Result>, ApiError> { let page = PageParams { page: q.page(), per_page: q.per_page(), }; let opts = FeedOptions::try_from(opts_q)?; let result = get_public_feed(&*d.feed, viewer, page, opts).await?; - Ok(Json(serde_json::json!({ - "items": result.items.iter().map(to_thought_response).collect::>(), - "total": result.total, - "page": result.page, - "per_page": result.per_page, - }))) + Ok(Json(PagedResponse { + items: result.items.iter().map(to_thought_response).collect(), + total: result.total, + page: result.page, + per_page: result.per_page, + })) } #[utoipa::path( @@ -210,12 +217,7 @@ pub async fn get_following_handler( Query(q): Query, headers: HeaderMap, ) -> Result { - let accept = headers - .get(header::ACCEPT) - .and_then(|v| v.to_str().ok()) - .unwrap_or(""); - - if accept.contains("application/activity+json") { + if wants_activity_json(&headers) { let user = get_user_by_id_or_username(&*d.users, ¶m).await?; let user_id = user.id; let page = q.page().try_into().ok(); @@ -232,10 +234,12 @@ pub async fn get_following_handler( per_page: q.per_page(), }; let result = list_local_following(&*d.follows, &user.id, page).await?; - Ok(Json(serde_json::json!({ - "total": result.total, - "items": result.items.iter().map(to_user_response).collect::>() - })) + Ok(Json(PagedResponse { + items: result.items.iter().map(to_user_response).collect(), + total: result.total, + page: result.page, + per_page: result.per_page, + }) .into_response()) } @@ -253,12 +257,7 @@ pub async fn get_followers_handler( Query(q): Query, headers: HeaderMap, ) -> Result { - let accept = headers - .get(header::ACCEPT) - .and_then(|v| v.to_str().ok()) - .unwrap_or(""); - - if accept.contains("application/activity+json") { + if wants_activity_json(&headers) { let user = get_user_by_id_or_username(&*d.users, ¶m).await?; let user_id = user.id; let page = q.page().try_into().ok(); @@ -275,10 +274,12 @@ pub async fn get_followers_handler( per_page: q.per_page(), }; let result = list_local_followers(&*d.follows, &user.id, page).await?; - Ok(Json(serde_json::json!({ - "total": result.total, - "items": result.items.iter().map(to_user_response).collect::>() - })) + Ok(Json(PagedResponse { + items: result.items.iter().map(to_user_response).collect(), + total: result.total, + page: result.page, + per_page: result.per_page, + }) .into_response()) } @@ -297,7 +298,7 @@ pub async fn user_thoughts_handler( OptionalAuthUser(viewer): OptionalAuthUser, Query(q): Query, Query(opts_q): Query, -) -> Result, ApiError> { +) -> Result>, ApiError> { let user = get_user_by_username(&*d.users, &username).await?; let page = PageParams { page: q.page(), @@ -305,12 +306,12 @@ pub async fn user_thoughts_handler( }; let opts = FeedOptions::try_from(opts_q)?; let result = get_user_feed(&*d.feed, user.id.clone(), page, opts, viewer).await?; - Ok(Json(serde_json::json!({ - "total": result.total, - "page": result.page, - "per_page": result.per_page, - "items": result.items.iter().map(to_thought_response).collect::>() - }))) + Ok(Json(PagedResponse { + items: result.items.iter().map(to_thought_response).collect(), + total: result.total, + page: result.page, + per_page: result.per_page, + })) } #[utoipa::path( @@ -353,18 +354,17 @@ pub async fn tag_thoughts_handler( OptionalAuthUser(viewer): OptionalAuthUser, Query(q): Query, Query(opts_q): Query, -) -> Result, ApiError> { +) -> Result>, ApiError> { let page = PageParams { page: q.page(), per_page: q.per_page(), }; let opts = FeedOptions::try_from(opts_q)?; let result = get_tag_feed(&*d.feed, &tag_name, page, opts, viewer).await?; - Ok(Json(serde_json::json!({ - "tag": tag_name, - "total": result.total, - "page": result.page, - "per_page": result.per_page, - "items": result.items.iter().map(to_thought_response).collect::>(), - }))) + Ok(Json(PagedResponse { + items: result.items.iter().map(to_thought_response).collect(), + total: result.total, + page: result.page, + per_page: result.per_page, + })) } diff --git a/crates/presentation/src/handlers/notifications/mod.rs b/crates/presentation/src/handlers/notifications/mod.rs index f5bc2ef..782432f 100644 --- a/crates/presentation/src/handlers/notifications/mod.rs +++ b/crates/presentation/src/handlers/notifications/mod.rs @@ -3,7 +3,7 @@ use crate::{ errors::ApiError, extractors::{AuthUser, Deps}, }; -use api_types::requests::NotificationUpdateRequest; +use api_types::{requests::NotificationUpdateRequest, responses::NotificationSummaryResponse}; use application::use_cases::notifications::{ count_unread_notifications, list_notifications as uc_list_notifications, mark_all_notifications_read, mark_notification_read as uc_mark_notification_read, @@ -22,17 +22,17 @@ deps_struct!(NotificationsDeps { pub async fn list_notifications( Deps(d): Deps, AuthUser(uid): AuthUser, -) -> Result, ApiError> { +) -> Result, ApiError> { let page = PageParams { page: 1, per_page: 20, }; let result = uc_list_notifications(&*d.notifications, &uid, page).await?; let unread = count_unread_notifications(&*d.notifications, &uid).await?; - Ok(Json(serde_json::json!({ - "total": result.total, - "unread": unread - }))) + Ok(Json(NotificationSummaryResponse { + total: result.total, + unread, + })) } #[utoipa::path(patch, path = "/notifications/{id}", params(("id" = uuid::Uuid, Path, description = "Notification ID")), request_body = NotificationUpdateRequest, responses((status = 204, description = "Marked read")), security(("bearer_auth" = [])))] diff --git a/crates/presentation/src/handlers/users/mod.rs b/crates/presentation/src/handlers/users/mod.rs index 76cfaa5..75e575a 100644 --- a/crates/presentation/src/handlers/users/mod.rs +++ b/crates/presentation/src/handlers/users/mod.rs @@ -6,12 +6,12 @@ use crate::{ }; use api_types::{ requests::{PaginationQuery, UpdateProfileRequest}, - responses::{ErrorResponse, ProfileField, RemoteActorResponse, UserResponse}, + responses::{ErrorResponse, PagedResponse, ProfileField, RemoteActorResponse, UserResponse}, }; use application::use_cases::profile::{ count_local_users, get_user as fetch_user, get_user_by_id_or_username, get_user_profile, list_local_following, list_users, update_profile, upload_avatar as upload_avatar_uc, - upload_banner as upload_banner_uc, UploadConfig, + upload_banner as upload_banner_uc, UploadConfig, UploadContext, }; use axum::{ extract::{Multipart, Path, Query}, @@ -54,6 +54,13 @@ impl FromAppState for UsersDeps { } } +fn wants_activity_json(headers: &HeaderMap) -> bool { + headers + .get(header::ACCEPT) + .and_then(|v| v.to_str().ok()) + .is_some_and(|a| a.contains("application/activity+json")) +} + #[utoipa::path( get, path = "/users/{username}", params(("username" = String, Path, description = "Username")), @@ -68,12 +75,7 @@ pub async fn get_user( OptionalAuthUser(viewer): OptionalAuthUser, headers: HeaderMap, ) -> Result { - let accept = headers - .get(header::ACCEPT) - .and_then(|v| v.to_str().ok()) - .unwrap_or(""); - - if accept.contains("application/activity+json") { + if wants_activity_json(&headers) { let user = get_user_by_id_or_username(&*d.users, &username).await?; let json = d.federation.actor_json(&user.id).await?; Ok(([(header::CONTENT_TYPE, "application/activity+json")], json).into_response()) @@ -145,17 +147,19 @@ pub async fn get_me_following( Deps(d): Deps, AuthUser(uid): AuthUser, Query(q): Query, -) -> Result, ApiError> { +) -> Result>, ApiError> { use domain::models::feed::PageParams; let page = PageParams { page: q.page(), per_page: q.per_page(), }; let result = list_local_following(&*d.follows, &uid, page).await?; - Ok(Json(serde_json::json!({ - "total": result.total, - "items": result.items.iter().map(to_user_response).collect::>(), - }))) + Ok(Json(PagedResponse { + items: result.items.iter().map(to_user_response).collect(), + total: result.total, + page: result.page, + per_page: result.per_page, + })) } #[utoipa::path( @@ -170,7 +174,7 @@ pub async fn get_me_following( pub async fn get_users( Deps(d): Deps, Query(params): Query>, -) -> Result, ApiError> { +) -> Result>, ApiError> { use domain::models::feed::PageParams; let page = params .get("page") @@ -184,38 +188,37 @@ pub async fn get_users( if let Some(q) = params.get("q").filter(|q| !q.trim().is_empty()) { let result = d.search.search_users(q, &page_params).await?; - let users: Vec<_> = result - .items - .iter() - .map(crate::handlers::auth::to_user_response) - .collect(); - return Ok(Json(serde_json::json!({ - "items": users, "total": result.total, "page": result.page, "per_page": result.per_page - }))); + return Ok(Json(PagedResponse { + items: result.items.iter().map(to_user_response).collect(), + total: result.total, + page: result.page, + per_page: result.per_page, + })); } let result = list_users(&*d.users, page_params).await?; - let items: Vec<_> = result + let items: Vec = result .items .iter() - .map(|u| { - serde_json::json!({ - "id": u.id.as_uuid(), - "username": u.username, - "displayName": u.display_name, - "avatarUrl": u.avatar_url, - "bio": u.bio, - "headerUrl": null, - "customCss": null, - "local": true, - "isFollowedByViewer": false, - "joinedAt": null, - }) + .map(|u| UserResponse { + id: u.id.as_uuid(), + username: u.username.clone(), + display_name: u.display_name.clone(), + bio: u.bio.clone(), + avatar_url: u.avatar_url.clone(), + header_url: None, + custom_css: None, + local: true, + is_followed_by_viewer: false, + created_at: chrono::Utc::now(), }) .collect(); - Ok(Json(serde_json::json!({ - "items": items, "total": result.total, "page": result.page, "per_page": result.per_page - }))) + Ok(Json(PagedResponse { + items, + total: result.total, + page: result.page, + per_page: result.per_page, + })) } #[utoipa::path( @@ -266,6 +269,25 @@ pub async fn lookup_handler( })) } +async fn extract_upload_field( + mut multipart: Multipart, +) -> Result<(String, axum::body::Bytes), ApiError> { + let field = multipart + .next_field() + .await + .map_err(|_| ApiError::BadRequest("invalid multipart".into()))? + .ok_or_else(|| ApiError::BadRequest("no file field".into()))?; + let content_type = field + .content_type() + .ok_or_else(|| ApiError::BadRequest("missing content-type on field".into()))? + .to_string(); + let data = field + .bytes() + .await + .map_err(|_| ApiError::BadRequest("failed to read upload".into()))?; + Ok((content_type, data)) +} + #[utoipa::path( put, path = "/users/me/avatar", request_body(content = String, content_type = "multipart/form-data", description = "Image file (JPEG, PNG, WebP, AVIF, GIF)"), @@ -278,35 +300,17 @@ pub async fn lookup_handler( pub async fn upload_avatar( Deps(d): Deps, AuthUser(uid): AuthUser, - mut multipart: Multipart, + multipart: Multipart, ) -> Result, ApiError> { - let field = multipart - .next_field() - .await - .map_err(|_| ApiError::BadRequest("invalid multipart".into()))? - .ok_or_else(|| ApiError::BadRequest("no file field".into()))?; - // Content-type is client-supplied; the use-case allowlist prevents obviously - // wrong types, but magic-byte validation is not performed. Serve media files - // from an isolated origin to prevent MIME-based XSS. - let content_type = field - .content_type() - .ok_or_else(|| ApiError::BadRequest("missing content-type on field".into()))? - .to_string(); - let data = field - .bytes() - .await - .map_err(|_| ApiError::BadRequest("failed to read upload".into()))?; - upload_avatar_uc( - &*d.users, - &*d.media, - &*d.events, - &uid, - &d.base_url, - &d.upload_config, - &content_type, - data, - ) - .await?; + let (content_type, data) = extract_upload_field(multipart).await?; + let ctx = UploadContext { + users: &*d.users, + media: &*d.media, + events: &*d.events, + upload_config: &d.upload_config, + base_url: &d.base_url, + }; + upload_avatar_uc(&ctx, &uid, &content_type, data).await?; let user = fetch_user(&*d.users, &uid).await?; Ok(Json(to_user_response(&user))) } @@ -323,35 +327,17 @@ pub async fn upload_avatar( pub async fn upload_banner( Deps(d): Deps, AuthUser(uid): AuthUser, - mut multipart: Multipart, + multipart: Multipart, ) -> Result, ApiError> { - let field = multipart - .next_field() - .await - .map_err(|_| ApiError::BadRequest("invalid multipart".into()))? - .ok_or_else(|| ApiError::BadRequest("no file field".into()))?; - // Content-type is client-supplied; the use-case allowlist prevents obviously - // wrong types, but magic-byte validation is not performed. Serve media files - // from an isolated origin to prevent MIME-based XSS. - let content_type = field - .content_type() - .ok_or_else(|| ApiError::BadRequest("missing content-type on field".into()))? - .to_string(); - let data = field - .bytes() - .await - .map_err(|_| ApiError::BadRequest("failed to read upload".into()))?; - upload_banner_uc( - &*d.users, - &*d.media, - &*d.events, - &uid, - &d.base_url, - &d.upload_config, - &content_type, - data, - ) - .await?; + let (content_type, data) = extract_upload_field(multipart).await?; + let ctx = UploadContext { + users: &*d.users, + media: &*d.media, + events: &*d.events, + upload_config: &d.upload_config, + base_url: &d.base_url, + }; + upload_banner_uc(&ctx, &uid, &content_type, data).await?; let user = fetch_user(&*d.users, &uid).await?; Ok(Json(to_user_response(&user))) } diff --git a/crates/presentation/src/testing.rs b/crates/presentation/src/testing.rs index 26080d7..344956d 100644 --- a/crates/presentation/src/testing.rs +++ b/crates/presentation/src/testing.rs @@ -31,73 +31,60 @@ impl PasswordHasher for NoOpHasher { } } -/// No-op ActivityPubRepository for presentation layer tests. pub struct NoOpApRepo; #[async_trait] impl ActivityPubRepository for NoOpApRepo { - async fn outbox_entries_for_actor( - &self, - _uid: &UserId, - ) -> Result, DomainError> { + async fn outbox_entries_for_actor(&self, _: &UserId) -> Result, DomainError> { Ok(vec![]) } async fn outbox_page_for_actor( &self, - _uid: &UserId, - _before: Option>, - _limit: usize, + _: &UserId, + _: Option>, + _: usize, ) -> Result, DomainError> { Ok(vec![]) } - async fn find_remote_actor_id( - &self, - _actor_ap_url: &str, - ) -> Result, DomainError> { + async fn find_remote_actor_id(&self, _: &str) -> Result, DomainError> { Ok(None) } - async fn intern_remote_actor(&self, _actor_ap_url: &str) -> Result { + async fn intern_remote_actor(&self, _: &str) -> Result { Err(DomainError::NotFound) } async fn update_remote_actor_display( &self, - _user_id: &UserId, - _display_name: Option<&str>, - _avatar_url: Option<&str>, + _: &UserId, + _: Option<&str>, + _: Option<&str>, ) -> Result<(), DomainError> { Ok(()) } async fn accept_note( &self, - _input: activitypub::AcceptNoteInput<'_>, + _: activitypub::AcceptNoteInput<'_>, ) -> Result { Ok(ThoughtId::from_uuid(uuid::Uuid::new_v4())) } - async fn apply_note_update(&self, _ap_id: &str, _new_content: &str) -> Result<(), DomainError> { + async fn apply_note_update(&self, _: &str, _: &str) -> Result<(), DomainError> { Ok(()) } - async fn retract_note(&self, _ap_id: &str) -> Result<(), DomainError> { + async fn retract_note(&self, _: &str) -> Result<(), DomainError> { Ok(()) } - async fn retract_actor_notes(&self, _actor_ap_url: &str) -> Result<(), DomainError> { + async fn retract_actor_notes(&self, _: &str) -> Result<(), DomainError> { Ok(()) } async fn count_local_notes(&self) -> Result { Ok(0) } - async fn get_thought_ap_id( - &self, - _thought_id: &ThoughtId, - ) -> Result, DomainError> { + async fn get_thought_ap_id(&self, _: &ThoughtId) -> Result, DomainError> { Ok(None) } - async fn get_actor_ap_urls( - &self, - _user_id: &UserId, - ) -> Result, DomainError> { + async fn get_actor_ap_urls(&self, _: &UserId) -> Result, DomainError> { Ok(None) } - async fn sync_remote_actor_to_user(&self, _actor_ap_url: &str) -> Result<(), DomainError> { + async fn sync_remote_actor_to_user(&self, _: &str) -> Result<(), DomainError> { Ok(()) } } diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs index 0c1f28b..5df1781 100644 --- a/crates/worker/src/factory.rs +++ b/crates/worker/src/factory.rs @@ -3,15 +3,16 @@ use postgres::remote_actor_connections::PgRemoteActorConnectionRepository; use sqlx::PgPool; use std::sync::Arc; -use activitypub::{ActivityPubRepository, OutboundFederationPort}; -use activitypub::{ApFederationAdapter, ThoughtsObjectHandler}; +use activitypub::ThoughtsObjectHandler; +use activitypub::{ + build_ap_service, ActivityPubRepository, ApServiceConfig, OutboundFederationPort, +}; use application::services::{ FederationEventService, FederationManagementEventService, NotificationEventService, }; use domain::ports::EventPublisher; -use k_ap::ActivityPubService; use postgres::activitypub::PgActivityPubRepository; -use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; +use postgres_federation::{PgApUserRepository, PgFederationRepository}; use crate::handlers::{FederationHandler, FederationManagementHandler, NotificationHandler}; @@ -44,7 +45,7 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker // ActivityPub service (for federation fan-out) let connections_repo_worker = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())); - let fed_repo_worker = Arc::new(PostgresFederationRepository::new(pool.clone())); + let fed_repo_worker = Arc::new(PgFederationRepository::new(pool.clone())); let ap_handler_worker = Arc::new(ThoughtsObjectHandler::new( Arc::new(PgActivityPubRepository::new(pool.clone())), base_url, @@ -53,27 +54,20 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker Arc::new(postgres::like::PgLikeRepository::new(pool.clone())), Arc::new(postgres::boost::PgBoostRepository::new(pool.clone())), )); - let raw_ap_service = Arc::new( - ActivityPubService::builder(base_url.to_string()) - .activity_repo(fed_repo_worker.clone()) - .follow_repo(fed_repo_worker.clone()) - .actor_repo(fed_repo_worker.clone()) - .blocklist_repo(fed_repo_worker.clone()) - .user_repo(Arc::new(PostgresApUserRepository::new( - pool.clone(), - base_url.to_string(), - ))) - .content_reader(ap_handler_worker.clone()) - .object_handler(ap_handler_worker) - .software_name("thoughts") - .build() - .await - .expect("ActivityPubService build failed"), - ); - let ap_service = Arc::new(ApFederationAdapter::new( - raw_ap_service.clone(), - connections_repo_worker, - )); + let (raw_ap_service, ap_service) = build_ap_service(ApServiceConfig { + base_url: base_url.to_string(), + activity_repo: fed_repo_worker.clone(), + follow_repo: fed_repo_worker.clone(), + actor_repo: fed_repo_worker.clone(), + blocklist_repo: fed_repo_worker.clone(), + user_repo: Arc::new(PgApUserRepository::new(pool.clone(), base_url.to_string())), + ap_handler: ap_handler_worker, + connections_repo: connections_repo_worker, + event_publisher: None, + allow_registration: false, + debug: false, + }) + .await; let ap_outbound = ap_service.clone() as Arc; let ap_repo_worker = Arc::new(PgActivityPubRepository::new(pool.clone())) as Arc;