refactor: type safety + dedup cleanup across 13 code smells
Some checks failed
test / unit (push) Has been cancelled
lint / lint (push) Has been cancelled

- typed PagedResponse/CreatedApiKeyResponse/NotificationSummaryResponse replace json! blocks
- extract TagRow/ApiKeyRow/OutboxRow to module level, top_friend uses sqlx flatten
- add should_broadcast() helper, inline dead let bindings in federation_event
- add UploadContext struct, extract_upload_field, wants_activity_json helpers
- rename PostgresFederationRepository→PgFederationRepository, PostgresApUserRepository→PgApUserRepository
- add IntoAnyhow trait replacing ~30 .map_err(|e| anyhow!(e)) calls
- extract build_ap_service shared between bootstrap and worker factories
- add postgres/constants.rs, PartialEq+Eq on PasswordHash
This commit is contained in:
2026-05-29 12:02:03 +02:00
parent 84edf58de6
commit 9798a1d829
20 changed files with 485 additions and 569 deletions

View File

@@ -11,3 +11,48 @@ pub use port::{
}; };
pub use service::ApFederationAdapter; pub use service::ApFederationAdapter;
pub use urls::ThoughtsUrls; pub use urls::ThoughtsUrls;
use domain::ports::RemoteActorConnectionRepository;
use k_ap::ActivityPubService;
use std::sync::Arc;
pub struct ApServiceConfig {
pub base_url: String,
pub activity_repo: Arc<dyn k_ap::ActivityRepository>,
pub follow_repo: Arc<dyn k_ap::FollowRepository>,
pub actor_repo: Arc<dyn k_ap::ActorRepository>,
pub blocklist_repo: Arc<dyn k_ap::BlocklistRepository>,
pub user_repo: Arc<dyn k_ap::ApUserRepository>,
pub ap_handler: Arc<ThoughtsObjectHandler>,
pub connections_repo: Arc<dyn RemoteActorConnectionRepository>,
pub event_publisher: Option<Arc<dyn k_ap::data::EventPublisher>>,
pub allow_registration: bool,
pub debug: bool,
}
pub async fn build_ap_service(
cfg: ApServiceConfig,
) -> (Arc<ActivityPubService>, Arc<ApFederationAdapter>) {
let mut builder = ActivityPubService::builder(cfg.base_url)
.activity_repo(cfg.activity_repo)
.follow_repo(cfg.follow_repo)
.actor_repo(cfg.actor_repo)
.blocklist_repo(cfg.blocklist_repo)
.user_repo(cfg.user_repo)
.content_reader(cfg.ap_handler.clone())
.object_handler(cfg.ap_handler)
.allow_registration(cfg.allow_registration)
.software_name("thoughts")
.debug(cfg.debug);
if let Some(publisher) = cfg.event_publisher {
builder = builder.event_publisher(publisher);
}
let raw = Arc::new(
builder
.build()
.await
.expect("Failed to build ActivityPubService"),
);
let adapter = Arc::new(ApFederationAdapter::new(raw.clone(), cfg.connections_repo));
(raw, adapter)
}

View File

@@ -1,8 +1,17 @@
use anyhow::{anyhow, Result}; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use sqlx::PgPool; use sqlx::PgPool;
trait IntoAnyhow<T> {
fn into_anyhow(self) -> Result<T>;
}
impl<T> IntoAnyhow<T> for std::result::Result<T, sqlx::Error> {
fn into_anyhow(self) -> Result<T> {
self.map_err(|e| anyhow::anyhow!(e))
}
}
use k_ap::{ use k_ap::{
ActivityRepository, ActorRepository, ApActorType, ApUser, ApUserRepository, BlockedDomain, ActivityRepository, ActorRepository, ApActorType, ApUser, ApUserRepository, BlockedDomain,
BlocklistRepository, FollowRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, BlocklistRepository, FollowRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor,
@@ -10,11 +19,11 @@ use k_ap::{
// ── PostgresFederationRepository ───────────────────────────────────────────── // ── PostgresFederationRepository ─────────────────────────────────────────────
pub struct PostgresFederationRepository { pub struct PgFederationRepository {
pool: PgPool, pool: PgPool,
} }
impl PostgresFederationRepository { impl PgFederationRepository {
pub fn new(pool: PgPool) -> Self { pub fn new(pool: PgPool) -> Self {
Self { pool } Self { pool }
} }
@@ -71,7 +80,7 @@ fn map_remote_actor(r: RemoteActorRow) -> RemoteActor {
// ── ActivityRepository ──────────────────────────────────────────────────────── // ── ActivityRepository ────────────────────────────────────────────────────────
#[async_trait] #[async_trait]
impl ActivityRepository for PostgresFederationRepository { impl ActivityRepository for PgFederationRepository {
async fn is_activity_processed(&self, activity_id: &str) -> Result<bool> { async fn is_activity_processed(&self, activity_id: &str) -> Result<bool> {
let n: i64 = sqlx::query_scalar( let n: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM federation_processed_activities WHERE activity_id=$1", "SELECT COUNT(*) FROM federation_processed_activities WHERE activity_id=$1",
@@ -79,7 +88,7 @@ impl ActivityRepository for PostgresFederationRepository {
.bind(activity_id) .bind(activity_id)
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await .await
.map_err(|e| anyhow!(e))?; .into_anyhow()?;
Ok(n > 0) Ok(n > 0)
} }
@@ -90,7 +99,7 @@ impl ActivityRepository for PostgresFederationRepository {
.bind(activity_id) .bind(activity_id)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
} }
@@ -98,7 +107,7 @@ impl ActivityRepository for PostgresFederationRepository {
// ── FollowRepository ────────────────────────────────────────────────────────── // ── FollowRepository ──────────────────────────────────────────────────────────
#[async_trait] #[async_trait]
impl FollowRepository for PostgresFederationRepository { impl FollowRepository for PgFederationRepository {
async fn add_follower( async fn add_follower(
&self, &self,
local_user_id: uuid::Uuid, local_user_id: uuid::Uuid,
@@ -118,7 +127,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(follow_activity_id) .bind(follow_activity_id)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
@@ -134,7 +143,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(remote_actor_url) .bind(remote_actor_url)
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
} }
async fn remove_follower( async fn remove_follower(
@@ -149,7 +158,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(remote_actor_url) .bind(remote_actor_url)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
@@ -172,7 +181,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(local_user_id) .bind(local_user_id)
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|rows| { .map(|rows| {
rows.into_iter() rows.into_iter()
.map(|r| Follower { .map(|r| Follower {
@@ -210,7 +219,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(offset as i64) .bind(offset as i64)
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|rows| { .map(|rows| {
rows.into_iter() rows.into_iter()
.map(|r| Follower { .map(|r| Follower {
@@ -227,7 +236,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(local_user_id) .bind(local_user_id)
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await .await
.map_err(|e| anyhow!(e))?; .into_anyhow()?;
Ok(n as usize) Ok(n as usize)
} }
@@ -238,7 +247,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(local_user_id) .bind(local_user_id)
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await .await
.map_err(|e| anyhow!(e))?; .into_anyhow()?;
Ok(n as usize) Ok(n as usize)
} }
@@ -262,7 +271,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(offset as i64) .bind(offset as i64)
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|rows| rows.into_iter().map(map_remote_actor).collect()) .map(|rows| rows.into_iter().map(map_remote_actor).collect())
} }
@@ -287,7 +296,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(local_user_id) .bind(local_user_id)
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| anyhow!(e))?; .into_anyhow()?;
Ok(rows) Ok(rows)
} }
@@ -303,7 +312,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(local_user_id) .bind(local_user_id)
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|rows| rows.into_iter().map(map_remote_actor).collect()) .map(|rows| rows.into_iter().map(map_remote_actor).collect())
} }
@@ -321,7 +330,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(status_str(&status)) .bind(status_str(&status))
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
@@ -344,7 +353,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(&actor.outbox_url) .bind(&actor.outbox_url)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
@@ -360,7 +369,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(remote_actor_url) .bind(remote_actor_url)
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
} }
async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> {
@@ -371,7 +380,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(actor_url) .bind(actor_url)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
@@ -387,7 +396,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(local_user_id) .bind(local_user_id)
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|rows| rows.into_iter().map(map_remote_actor).collect()) .map(|rows| rows.into_iter().map(map_remote_actor).collect())
} }
@@ -411,7 +420,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(offset as i64) .bind(offset as i64)
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|rows| rows.into_iter().map(map_remote_actor).collect()) .map(|rows| rows.into_iter().map(map_remote_actor).collect())
} }
@@ -421,7 +430,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(local_user_id) .bind(local_user_id)
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await .await
.map_err(|e| anyhow!(e))?; .into_anyhow()?;
Ok(n as usize) Ok(n as usize)
} }
@@ -443,7 +452,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(s) .bind(s)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
@@ -459,7 +468,7 @@ impl FollowRepository for PostgresFederationRepository {
.bind(remote_actor_url) .bind(remote_actor_url)
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
} }
async fn migrate_follower_actor( async fn migrate_follower_actor(
@@ -467,7 +476,7 @@ impl FollowRepository for PostgresFederationRepository {
old_actor_url: &str, old_actor_url: &str,
new_actor_url: &str, new_actor_url: &str,
) -> Result<Vec<uuid::Uuid>> { ) -> Result<Vec<uuid::Uuid>> {
let mut tx = self.pool.begin().await.map_err(|e| anyhow!(e))?; let mut tx = self.pool.begin().await.into_anyhow()?;
let affected: Vec<uuid::Uuid> = sqlx::query_scalar( let affected: Vec<uuid::Uuid> = sqlx::query_scalar(
"INSERT INTO federation_following(local_user_id, remote_actor_url, follow_activity_id, outbox_url) "INSERT INTO federation_following(local_user_id, remote_actor_url, follow_activity_id, outbox_url)
@@ -481,15 +490,15 @@ impl FollowRepository for PostgresFederationRepository {
.bind(new_actor_url) .bind(new_actor_url)
.fetch_all(&mut *tx) .fetch_all(&mut *tx)
.await .await
.map_err(|e| anyhow!(e))?; .into_anyhow()?;
sqlx::query("DELETE FROM federation_following WHERE remote_actor_url = $1") sqlx::query("DELETE FROM federation_following WHERE remote_actor_url = $1")
.bind(old_actor_url) .bind(old_actor_url)
.execute(&mut *tx) .execute(&mut *tx)
.await .await
.map_err(|e| anyhow!(e))?; .into_anyhow()?;
tx.commit().await.map_err(|e| anyhow!(e))?; tx.commit().await.into_anyhow()?;
Ok(affected) Ok(affected)
} }
} }
@@ -497,7 +506,7 @@ impl FollowRepository for PostgresFederationRepository {
// ── ActorRepository ─────────────────────────────────────────────────────────── // ── ActorRepository ───────────────────────────────────────────────────────────
#[async_trait] #[async_trait]
impl ActorRepository for PostgresFederationRepository { impl ActorRepository for PgFederationRepository {
async fn get_local_actor_keypair( async fn get_local_actor_keypair(
&self, &self,
user_id: uuid::Uuid, user_id: uuid::Uuid,
@@ -513,7 +522,7 @@ impl ActorRepository for PostgresFederationRepository {
.bind(user_id) .bind(user_id)
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| anyhow!(e))?; .into_anyhow()?;
Ok(row.and_then(|r| match (r.public_key, r.private_key) { Ok(row.and_then(|r| match (r.public_key, r.private_key) {
(Some(pub_k), Some(priv_k)) => Some((pub_k, priv_k)), (Some(pub_k), Some(priv_k)) => Some((pub_k, priv_k)),
_ => None, _ => None,
@@ -532,7 +541,7 @@ impl ActorRepository for PostgresFederationRepository {
.bind(&private_key) .bind(&private_key)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
@@ -568,7 +577,7 @@ impl ActorRepository for PostgresFederationRepository {
.bind(also_known_as.as_deref()) .bind(also_known_as.as_deref())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
@@ -581,7 +590,7 @@ impl ActorRepository for PostgresFederationRepository {
.bind(actor_url) .bind(actor_url)
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|o| o.map(map_remote_actor)) .map(|o| o.map(map_remote_actor))
} }
@@ -602,7 +611,7 @@ impl ActorRepository for PostgresFederationRepository {
.bind(announced_at) .bind(announced_at)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
@@ -612,7 +621,7 @@ impl ActorRepository for PostgresFederationRepository {
.bind(actor_url) .bind(actor_url)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
@@ -622,7 +631,7 @@ impl ActorRepository for PostgresFederationRepository {
.bind(object_url) .bind(object_url)
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await .await
.map_err(|e| anyhow!(e))?; .into_anyhow()?;
Ok(n as usize) Ok(n as usize)
} }
} }
@@ -630,7 +639,7 @@ impl ActorRepository for PostgresFederationRepository {
// ── BlocklistRepository ─────────────────────────────────────────────────────── // ── BlocklistRepository ───────────────────────────────────────────────────────
#[async_trait] #[async_trait]
impl BlocklistRepository for PostgresFederationRepository { impl BlocklistRepository for PgFederationRepository {
async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> Result<()> { async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> Result<()> {
sqlx::query( sqlx::query(
"INSERT INTO federation_blocked_domains(domain,reason) VALUES($1,$2) ON CONFLICT(domain) DO NOTHING", "INSERT INTO federation_blocked_domains(domain,reason) VALUES($1,$2) ON CONFLICT(domain) DO NOTHING",
@@ -639,7 +648,7 @@ impl BlocklistRepository for PostgresFederationRepository {
.bind(reason) .bind(reason)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
@@ -648,7 +657,7 @@ impl BlocklistRepository for PostgresFederationRepository {
.bind(domain) .bind(domain)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
@@ -664,7 +673,7 @@ impl BlocklistRepository for PostgresFederationRepository {
) )
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|rows| { .map(|rows| {
rows.into_iter() rows.into_iter()
.map(|r| BlockedDomain { .map(|r| BlockedDomain {
@@ -682,7 +691,7 @@ impl BlocklistRepository for PostgresFederationRepository {
.bind(domain) .bind(domain)
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await .await
.map_err(|e| anyhow!(e))?; .into_anyhow()?;
Ok(n > 0) Ok(n > 0)
} }
@@ -694,7 +703,7 @@ impl BlocklistRepository for PostgresFederationRepository {
.bind(actor_url) .bind(actor_url)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
@@ -704,7 +713,7 @@ impl BlocklistRepository for PostgresFederationRepository {
.bind(actor_url) .bind(actor_url)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
.map(|_| ()) .map(|_| ())
} }
@@ -715,7 +724,7 @@ impl BlocklistRepository for PostgresFederationRepository {
.bind(local_user_id) .bind(local_user_id)
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| anyhow!(e)) .into_anyhow()
} }
async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<bool> { async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<bool> {
@@ -726,7 +735,7 @@ impl BlocklistRepository for PostgresFederationRepository {
.bind(actor_url) .bind(actor_url)
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await .await
.map_err(|e| anyhow!(e))?; .into_anyhow()?;
Ok(n > 0) Ok(n > 0)
} }
} }
@@ -744,12 +753,12 @@ struct UserRow {
also_known_as: Option<String>, also_known_as: Option<String>,
} }
pub struct PostgresApUserRepository { pub struct PgApUserRepository {
pool: PgPool, pool: PgPool,
base_url: String, base_url: String,
} }
impl PostgresApUserRepository { impl PgApUserRepository {
pub fn new(pool: PgPool, base_url: String) -> Self { pub fn new(pool: PgPool, base_url: String) -> Self {
Self { pool, base_url } Self { pool, base_url }
} }
@@ -777,7 +786,7 @@ impl PostgresApUserRepository {
} }
#[async_trait] #[async_trait]
impl ApUserRepository for PostgresApUserRepository { impl ApUserRepository for PgApUserRepository {
async fn find_by_id(&self, id: uuid::Uuid) -> Result<Option<ApUser>> { async fn find_by_id(&self, id: uuid::Uuid) -> Result<Option<ApUser>> {
let row = sqlx::query_as::<_, UserRow>( let row = sqlx::query_as::<_, UserRow>(
"SELECT id,username,display_name,bio,avatar_url,header_url,also_known_as FROM users WHERE id=$1 AND local=true", "SELECT id,username,display_name,bio,avatar_url,header_url,also_known_as FROM users WHERE id=$1 AND local=true",
@@ -785,7 +794,7 @@ impl ApUserRepository for PostgresApUserRepository {
.bind(id) .bind(id)
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| anyhow!(e))?; .into_anyhow()?;
Ok(row.map(|r| self.row_to_ap_user(r))) Ok(row.map(|r| self.row_to_ap_user(r)))
} }
@@ -796,7 +805,7 @@ impl ApUserRepository for PostgresApUserRepository {
.bind(username) .bind(username)
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| anyhow!(e))?; .into_anyhow()?;
Ok(row.map(|r| self.row_to_ap_user(r))) Ok(row.map(|r| self.row_to_ap_user(r)))
} }
@@ -804,7 +813,7 @@ impl ApUserRepository for PostgresApUserRepository {
let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users WHERE local=true") let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users WHERE local=true")
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await .await
.map_err(|e| anyhow!(e))?; .into_anyhow()?;
Ok(n as usize) Ok(n as usize)
} }
} }

View File

@@ -13,6 +13,40 @@ use domain::{
value_objects::{Content, ThoughtId, UserId, Username}, value_objects::{Content, ThoughtId, UserId, Username},
}; };
#[derive(sqlx::FromRow)]
struct OutboxRow {
id: uuid::Uuid,
user_id: uuid::Uuid,
content: String,
created_at: DateTime<Utc>,
in_reply_to_id: Option<uuid::Uuid>,
content_warning: Option<String>,
sensitive: bool,
username: String,
updated_at: Option<DateTime<Utc>>,
}
impl OutboxRow {
fn into_entry(self) -> OutboxEntry {
OutboxEntry {
thought: Thought {
id: ThoughtId::from_uuid(self.id),
user_id: UserId::from_uuid(self.user_id),
content: Content::new_remote(self.content),
in_reply_to_id: self.in_reply_to_id.map(ThoughtId::from_uuid),
visibility: Visibility::Public,
content_warning: self.content_warning,
sensitive: self.sensitive,
local: true,
created_at: self.created_at,
updated_at: self.updated_at,
note_extensions: None,
},
author_username: Username::from_trusted(self.username),
}
}
}
pub struct PgActivityPubRepository { pub struct PgActivityPubRepository {
pool: PgPool, pool: PgPool,
} }
@@ -29,19 +63,7 @@ impl ActivityPubRepository for PgActivityPubRepository {
&self, &self,
user_id: &UserId, user_id: &UserId,
) -> Result<Vec<OutboxEntry>, DomainError> { ) -> Result<Vec<OutboxEntry>, DomainError> {
#[derive(sqlx::FromRow)] sqlx::query_as::<_, OutboxRow>(
struct Row {
id: uuid::Uuid,
user_id: uuid::Uuid,
content: String,
created_at: DateTime<Utc>,
in_reply_to_id: Option<uuid::Uuid>,
content_warning: Option<String>,
sensitive: bool,
username: String,
updated_at: Option<DateTime<Utc>>,
}
sqlx::query_as::<_, Row>(
"SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at "SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at
FROM thoughts t JOIN users u ON u.id=t.user_id FROM thoughts t JOIN users u ON u.id=t.user_id
WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' WHERE t.user_id=$1 AND t.local=true AND t.visibility='public'
@@ -51,26 +73,7 @@ impl ActivityPubRepository for PgActivityPubRepository {
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.into_domain() .into_domain()
.map(|rows| { .map(|rows| rows.into_iter().map(OutboxRow::into_entry).collect())
rows.into_iter()
.map(|r| OutboxEntry {
thought: Thought {
id: ThoughtId::from_uuid(r.id),
user_id: UserId::from_uuid(r.user_id),
content: Content::new_remote(r.content),
in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid),
visibility: Visibility::Public,
content_warning: r.content_warning,
sensitive: r.sensitive,
local: true,
created_at: r.created_at,
updated_at: r.updated_at,
note_extensions: None,
},
author_username: Username::from_trusted(r.username),
})
.collect()
})
} }
async fn outbox_page_for_actor( async fn outbox_page_for_actor(
@@ -79,20 +82,8 @@ impl ActivityPubRepository for PgActivityPubRepository {
before: Option<DateTime<Utc>>, before: Option<DateTime<Utc>>,
limit: usize, limit: usize,
) -> Result<Vec<OutboxEntry>, DomainError> { ) -> Result<Vec<OutboxEntry>, DomainError> {
#[derive(sqlx::FromRow)]
struct Row {
id: uuid::Uuid,
user_id: uuid::Uuid,
content: String,
created_at: DateTime<Utc>,
in_reply_to_id: Option<uuid::Uuid>,
content_warning: Option<String>,
sensitive: bool,
username: String,
updated_at: Option<DateTime<Utc>>,
}
let rows = if let Some(before) = before { let rows = if let Some(before) = before {
sqlx::query_as::<_, Row>( sqlx::query_as::<_, OutboxRow>(
"SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at "SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at
FROM thoughts t JOIN users u ON u.id=t.user_id FROM thoughts t JOIN users u ON u.id=t.user_id
WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' AND t.created_at < $2 WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' AND t.created_at < $2
@@ -104,7 +95,7 @@ impl ActivityPubRepository for PgActivityPubRepository {
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
} else { } else {
sqlx::query_as::<_, Row>( sqlx::query_as::<_, OutboxRow>(
"SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at "SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at
FROM thoughts t JOIN users u ON u.id=t.user_id FROM thoughts t JOIN users u ON u.id=t.user_id
WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' WHERE t.user_id=$1 AND t.local=true AND t.visibility='public'
@@ -117,25 +108,7 @@ impl ActivityPubRepository for PgActivityPubRepository {
} }
.into_domain()?; .into_domain()?;
Ok(rows Ok(rows.into_iter().map(OutboxRow::into_entry).collect())
.into_iter()
.map(|r| OutboxEntry {
thought: Thought {
id: ThoughtId::from_uuid(r.id),
user_id: UserId::from_uuid(r.user_id),
content: Content::new_remote(r.content),
in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid),
visibility: Visibility::Public,
content_warning: r.content_warning,
sensitive: r.sensitive,
local: true,
created_at: r.created_at,
updated_at: r.updated_at,
note_extensions: None,
},
author_username: Username::from_trusted(r.username),
})
.collect())
} }
async fn find_remote_actor_id( async fn find_remote_actor_id(

View File

@@ -9,6 +9,27 @@ use domain::{
}; };
use sqlx::PgPool; use sqlx::PgPool;
#[derive(sqlx::FromRow)]
struct ApiKeyRow {
id: uuid::Uuid,
user_id: uuid::Uuid,
key_hash: String,
name: String,
created_at: DateTime<Utc>,
}
impl ApiKeyRow {
fn into_domain(self) -> ApiKey {
ApiKey {
id: ApiKeyId::from_uuid(self.id),
user_id: UserId::from_uuid(self.user_id),
key_hash: self.key_hash,
name: self.name,
created_at: self.created_at,
}
}
}
pub struct PgApiKeyRepository { pub struct PgApiKeyRepository {
pool: PgPool, pool: PgPool,
} }
@@ -36,45 +57,21 @@ impl ApiKeyRepository for PgApiKeyRepository {
} }
async fn find_by_hash(&self, hash: &str) -> Result<Option<ApiKey>, DomainError> { async fn find_by_hash(&self, hash: &str) -> Result<Option<ApiKey>, DomainError> {
#[derive(sqlx::FromRow)] sqlx::query_as::<_, ApiKeyRow>(
struct Row {
id: uuid::Uuid,
user_id: uuid::Uuid,
key_hash: String,
name: String,
created_at: DateTime<Utc>,
}
sqlx::query_as::<_, Row>(
"SELECT id,user_id,key_hash,name,created_at FROM api_keys WHERE key_hash=$1", "SELECT id,user_id,key_hash,name,created_at FROM api_keys WHERE key_hash=$1",
) )
.bind(hash) .bind(hash)
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.into_domain() .into_domain()
.map(|o| { .map(|o| o.map(ApiKeyRow::into_domain))
o.map(|r| ApiKey {
id: ApiKeyId::from_uuid(r.id),
user_id: UserId::from_uuid(r.user_id),
key_hash: r.key_hash,
name: r.name,
created_at: r.created_at,
})
})
} }
async fn list_for_user(&self, user_id: &UserId) -> Result<Vec<ApiKey>, DomainError> { async fn list_for_user(&self, user_id: &UserId) -> Result<Vec<ApiKey>, DomainError> {
#[derive(sqlx::FromRow)] sqlx::query_as::<_, ApiKeyRow>("SELECT id,user_id,key_hash,name,created_at FROM api_keys WHERE user_id=$1 ORDER BY created_at DESC")
struct Row {
id: uuid::Uuid,
user_id: uuid::Uuid,
key_hash: String,
name: String,
created_at: DateTime<Utc>,
}
sqlx::query_as::<_, Row>("SELECT id,user_id,key_hash,name,created_at FROM api_keys WHERE user_id=$1 ORDER BY created_at DESC")
.bind(user_id.as_uuid()).fetch_all(&self.pool).await .bind(user_id.as_uuid()).fetch_all(&self.pool).await
.into_domain() .into_domain()
.map(|rows| rows.into_iter().map(|r| ApiKey { id: ApiKeyId::from_uuid(r.id), user_id: UserId::from_uuid(r.user_id), key_hash: r.key_hash, name: r.name, created_at: r.created_at }).collect()) .map(|rows| rows.into_iter().map(ApiKeyRow::into_domain).collect())
} }
async fn delete(&self, id: &ApiKeyId, user_id: &UserId) -> Result<(), DomainError> { async fn delete(&self, id: &ApiKeyId, user_id: &UserId) -> Result<(), DomainError> {

View File

@@ -0,0 +1,8 @@
pub const STATUS_ACCEPTED: &str = "accepted";
pub const STATUS_PENDING: &str = "pending";
pub const STATUS_REJECTED: &str = "rejected";
pub const VIS_PUBLIC: &str = "public";
pub const VIS_UNLISTED: &str = "unlisted";
pub const VIS_FOLLOWERS: &str = "followers";
pub const VIS_DIRECT: &str = "direct";

View File

@@ -2,6 +2,7 @@ pub mod activitypub;
pub mod api_key; pub mod api_key;
pub mod block; pub mod block;
pub mod boost; pub mod boost;
pub mod constants;
mod db_error; mod db_error;
pub mod engagement; pub mod engagement;
pub mod failed_event; pub mod failed_event;

View File

@@ -12,6 +12,12 @@ use domain::{
}; };
use sqlx::PgPool; use sqlx::PgPool;
#[derive(sqlx::FromRow)]
struct TagRow {
id: i32,
name: String,
}
pub struct PgTagRepository { pub struct PgTagRepository {
pool: PgPool, pool: PgPool,
} }
@@ -30,12 +36,7 @@ impl TagRepository for PgTagRepository {
.execute(&self.pool) .execute(&self.pool)
.await .await
.into_domain()?; .into_domain()?;
#[derive(sqlx::FromRow)] let row = sqlx::query_as::<_, TagRow>("SELECT id,name FROM tags WHERE name=$1")
struct Row {
id: i32,
name: String,
}
let row = sqlx::query_as::<_, Row>("SELECT id,name FROM tags WHERE name=$1")
.bind(&name) .bind(&name)
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await .await
@@ -72,12 +73,7 @@ impl TagRepository for PgTagRepository {
} }
async fn list_for_thought(&self, thought_id: &ThoughtId) -> Result<Vec<Tag>, DomainError> { async fn list_for_thought(&self, thought_id: &ThoughtId) -> Result<Vec<Tag>, DomainError> {
#[derive(sqlx::FromRow)] sqlx::query_as::<_, TagRow>(
struct Row {
id: i32,
name: String,
}
sqlx::query_as::<_, Row>(
"SELECT t.id,t.name FROM tags t JOIN thought_tags tt ON tt.tag_id=t.id WHERE tt.thought_id=$1" "SELECT t.id,t.name FROM tags t JOIN thought_tags tt ON tt.tag_id=t.id WHERE tt.thought_id=$1"
).bind(thought_id.as_uuid()).fetch_all(&self.pool).await ).bind(thought_id.as_uuid()).fetch_all(&self.pool).await
.into_domain() .into_domain()

View File

@@ -44,24 +44,14 @@ impl TopFriendRepository for PgTopFriendRepository {
async fn list_for_user(&self, user_id: &UserId) -> Result<Vec<(TopFriend, User)>, DomainError> { async fn list_for_user(&self, user_id: &UserId) -> Result<Vec<(TopFriend, User)>, DomainError> {
#[derive(sqlx::FromRow)] #[derive(sqlx::FromRow)]
struct Row { struct TopFriendRow {
tf_user_id: uuid::Uuid, tf_user_id: uuid::Uuid,
friend_id: uuid::Uuid, friend_id: uuid::Uuid,
position: i16, position: i16,
id: uuid::Uuid, #[sqlx(flatten)]
username: String, user: crate::user::UserRow,
email: String,
password_hash: String,
display_name: Option<String>,
bio: Option<String>,
avatar_url: Option<String>,
header_url: Option<String>,
custom_css: Option<String>,
local: bool,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
} }
let rows = sqlx::query_as::<_, Row>( let rows = sqlx::query_as::<_, TopFriendRow>(
"SELECT tf.user_id AS tf_user_id, tf.friend_id, tf.position, "SELECT tf.user_id AS tf_user_id, tf.friend_id, tf.position,
u.id, u.username, u.email, u.password_hash, u.display_name, u.bio, u.id, u.username, u.email, u.password_hash, u.display_name, u.bio,
u.avatar_url, u.header_url, u.custom_css, u.local, u.avatar_url, u.header_url, u.custom_css, u.local,
@@ -77,27 +67,12 @@ impl TopFriendRepository for PgTopFriendRepository {
Ok(rows Ok(rows
.into_iter() .into_iter()
.map(|r| { .map(|r| {
use domain::value_objects::{Email, PasswordHash, Username};
let tf = TopFriend { let tf = TopFriend {
user_id: UserId::from_uuid(r.tf_user_id), user_id: UserId::from_uuid(r.tf_user_id),
friend_id: UserId::from_uuid(r.friend_id), friend_id: UserId::from_uuid(r.friend_id),
position: r.position, position: r.position,
}; };
let u = User { (tf, User::from(r.user))
id: UserId::from_uuid(r.id),
username: Username::from_trusted(r.username),
email: Email::from_trusted(r.email),
password_hash: PasswordHash(r.password_hash),
display_name: r.display_name,
bio: r.bio,
avatar_url: r.avatar_url,
header_url: r.header_url,
custom_css: r.custom_css,
local: r.local,
created_at: r.created_at,
updated_at: r.updated_at,
};
(tf, u)
}) })
.collect()) .collect())
} }

View File

@@ -83,6 +83,13 @@ pub struct TopFriendsResponse {
pub top_friends: Vec<UserResponse>, pub top_friends: Vec<UserResponse>,
} }
#[derive(Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct NotificationSummaryResponse {
pub total: i64,
pub unread: u64,
}
#[derive(Serialize, utoipa::ToSchema)] #[derive(Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct ErrorResponse { pub struct ErrorResponse {

View File

@@ -8,6 +8,10 @@ use domain::{
}; };
use std::sync::Arc; use std::sync::Arc;
fn should_broadcast(t: &domain::models::thought::Thought) -> bool {
t.local && matches!(t.visibility, Visibility::Public | Visibility::Unlisted)
}
pub struct FederationEventService { pub struct FederationEventService {
pub thoughts: Arc<dyn ThoughtRepository>, pub thoughts: Arc<dyn ThoughtRepository>,
pub users: Arc<dyn UserReader>, pub users: Arc<dyn UserReader>,
@@ -32,15 +36,7 @@ impl FederationEventService {
.. ..
} => { } => {
let thought = match self.thoughts.find_by_id(thought_id).await? { let thought = match self.thoughts.find_by_id(thought_id).await? {
Some(t) Some(t) if should_broadcast(&t) => t,
if t.local
&& matches!(
t.visibility,
Visibility::Public | Visibility::Unlisted
) =>
{
t
}
_ => { _ => {
tracing::debug!(thought_id = %thought_id, "federation: skipping ThoughtCreated (remote or non-public)"); tracing::debug!(thought_id = %thought_id, "federation: skipping ThoughtCreated (remote or non-public)");
return Ok(()); return Ok(());
@@ -86,15 +82,7 @@ impl FederationEventService {
user_id, user_id,
} => { } => {
let thought = match self.thoughts.find_by_id(thought_id).await? { let thought = match self.thoughts.find_by_id(thought_id).await? {
Some(t) Some(t) if should_broadcast(&t) => t,
if t.local
&& matches!(
t.visibility,
Visibility::Public | Visibility::Unlisted
) =>
{
t
}
_ => return Ok(()), _ => return Ok(()),
}; };
let user = match self.users.find_by_id(user_id).await? { let user = match self.users.find_by_id(user_id).await? {
@@ -125,14 +113,10 @@ impl FederationEventService {
user_id, user_id,
thought_id, thought_id,
} => { } => {
let booster = match self.users.find_by_id(user_id).await? { if !matches!(self.users.find_by_id(user_id).await?, Some(u) if u.local) {
Some(u) if u.local => u,
_ => {
tracing::debug!(user_id = %user_id, "federation: skipping BoostAdded (remote user)"); tracing::debug!(user_id = %user_id, "federation: skipping BoostAdded (remote user)");
return Ok(()); return Ok(());
} }
};
let _ = booster;
if self.thoughts.find_by_id(thought_id).await?.is_none() { if self.thoughts.find_by_id(thought_id).await?.is_none() {
return Ok(()); return Ok(());
} }
@@ -160,14 +144,10 @@ impl FederationEventService {
user_id, user_id,
thought_id, thought_id,
} => { } => {
let liker = match self.users.find_by_id(user_id).await? { if !matches!(self.users.find_by_id(user_id).await?, Some(u) if u.local) {
Some(u) if u.local => u,
_ => {
tracing::debug!(user_id = %user_id, "federation: skipping LikeAdded (remote user)"); tracing::debug!(user_id = %user_id, "federation: skipping LikeAdded (remote user)");
return Ok(()); return Ok(());
} }
};
let _ = liker;
let thought = match self.thoughts.find_by_id(thought_id).await? { let thought = match self.thoughts.find_by_id(thought_id).await? {
Some(t) => t, Some(t) => t,
_ => return Ok(()), _ => return Ok(()),
@@ -193,11 +173,9 @@ impl FederationEventService {
user_id, user_id,
thought_id, thought_id,
} => { } => {
let liker = match self.users.find_by_id(user_id).await? { if !matches!(self.users.find_by_id(user_id).await?, Some(u) if u.local) {
Some(u) if u.local => u, return Ok(());
_ => return Ok(()), }
};
let _ = liker;
let thought = match self.thoughts.find_by_id(thought_id).await? { let thought = match self.thoughts.find_by_id(thought_id).await? {
Some(t) => t, Some(t) => t,
_ => return Ok(()), _ => return Ok(()),

View File

@@ -118,17 +118,25 @@ fn mime_to_ext(mime: &str) -> Result<&'static str, DomainError> {
} }
} }
#[allow(clippy::too_many_arguments)] pub struct UploadContext<'a> {
pub users: &'a dyn UserRepository,
pub media: &'a dyn MediaStore,
pub events: &'a dyn EventPublisher,
pub upload_config: &'a UploadConfig,
pub base_url: &'a str,
}
async fn store_image( async fn store_image(
media: &dyn MediaStore, ctx: &UploadContext<'_>,
base_url: &str,
cfg: &UploadConfig,
content_type: &str, content_type: &str,
data: Bytes, data: Bytes,
user_id: &UserId, user_id: &UserId,
key_segment: &str, key_segment: &str,
old_url: Option<&str>, old_url: Option<&str>,
) -> Result<String, DomainError> { ) -> Result<String, DomainError> {
let cfg = ctx.upload_config;
let media = ctx.media;
let base_url = ctx.base_url;
if !cfg.allowed_content_types.iter().any(|t| t == content_type) { if !cfg.allowed_content_types.iter().any(|t| t == content_type) {
return Err(DomainError::InvalidInput("unsupported content type".into())); return Err(DomainError::InvalidInput("unsupported content type".into()));
} }
@@ -148,25 +156,19 @@ async fn store_image(
Ok(key) Ok(key)
} }
#[allow(clippy::too_many_arguments)]
pub async fn upload_avatar( pub async fn upload_avatar(
users: &dyn UserRepository, ctx: &UploadContext<'_>,
media: &dyn MediaStore,
events: &dyn EventPublisher,
user_id: &UserId, user_id: &UserId,
base_url: &str,
cfg: &UploadConfig,
content_type: &str, content_type: &str,
data: Bytes, data: Bytes,
) -> Result<(), DomainError> { ) -> Result<(), DomainError> {
let current = users let current = ctx
.users
.find_by_id(user_id) .find_by_id(user_id)
.await? .await?
.ok_or(DomainError::NotFound)?; .ok_or(DomainError::NotFound)?;
let key = store_image( let key = store_image(
media, ctx,
base_url,
cfg,
content_type, content_type,
data, data,
user_id, user_id,
@@ -174,41 +176,35 @@ pub async fn upload_avatar(
current.avatar_url.as_deref(), current.avatar_url.as_deref(),
) )
.await?; .await?;
users ctx.users
.update_profile( .update_profile(
user_id, user_id,
UpdateProfileInput { UpdateProfileInput {
avatar_url: Some(format!("{base_url}/media/{key}")), avatar_url: Some(format!("{}/media/{key}", ctx.base_url)),
..Default::default() ..Default::default()
}, },
) )
.await?; .await?;
events ctx.events
.publish(&DomainEvent::ProfileUpdated { .publish(&DomainEvent::ProfileUpdated {
user_id: user_id.clone(), user_id: user_id.clone(),
}) })
.await .await
} }
#[allow(clippy::too_many_arguments)]
pub async fn upload_banner( pub async fn upload_banner(
users: &dyn UserRepository, ctx: &UploadContext<'_>,
media: &dyn MediaStore,
events: &dyn EventPublisher,
user_id: &UserId, user_id: &UserId,
base_url: &str,
cfg: &UploadConfig,
content_type: &str, content_type: &str,
data: Bytes, data: Bytes,
) -> Result<(), DomainError> { ) -> Result<(), DomainError> {
let current = users let current = ctx
.users
.find_by_id(user_id) .find_by_id(user_id)
.await? .await?
.ok_or(DomainError::NotFound)?; .ok_or(DomainError::NotFound)?;
let key = store_image( let key = store_image(
media, ctx,
base_url,
cfg,
content_type, content_type,
data, data,
user_id, user_id,
@@ -216,16 +212,16 @@ pub async fn upload_banner(
current.header_url.as_deref(), current.header_url.as_deref(),
) )
.await?; .await?;
users ctx.users
.update_profile( .update_profile(
user_id, user_id,
UpdateProfileInput { UpdateProfileInput {
header_url: Some(format!("{base_url}/media/{key}")), header_url: Some(format!("{}/media/{key}", ctx.base_url)),
..Default::default() ..Default::default()
}, },
) )
.await?; .await?;
events ctx.events
.publish(&DomainEvent::ProfileUpdated { .publish(&DomainEvent::ProfileUpdated {
user_id: user_id.clone(), user_id: user_id.clone(),
}) })

View File

@@ -113,22 +113,29 @@ fn default_cfg() -> UploadConfig {
UploadConfig::default() UploadConfig::default()
} }
fn make_ctx<'a>(
store: &'a TestStore,
media: &'a MockMedia,
cfg: &'a UploadConfig,
) -> UploadContext<'a> {
UploadContext {
users: store,
media,
events: store,
upload_config: cfg,
base_url: "http://localhost",
}
}
#[tokio::test] #[tokio::test]
async fn upload_avatar_rejects_unsupported_mime() { async fn upload_avatar_rejects_unsupported_mime() {
let store = TestStore::default(); let store = TestStore::default();
let media = MockMedia::default(); let media = MockMedia::default();
let user = make_user(); let user = make_user();
store.users.lock().unwrap().push(user.clone()); store.users.lock().unwrap().push(user.clone());
let err = upload_avatar( let cfg = default_cfg();
&store, let ctx = make_ctx(&store, &media, &cfg);
&media, let err = upload_avatar(&ctx, &user.id, "text/plain", Bytes::from("hi"))
&store,
&user.id,
"http://localhost",
&default_cfg(),
"text/plain",
Bytes::from("hi"),
)
.await .await
.unwrap_err(); .unwrap_err();
assert!(matches!(err, DomainError::InvalidInput(_))); assert!(matches!(err, DomainError::InvalidInput(_)));
@@ -141,16 +148,9 @@ async fn upload_avatar_rejects_oversized_data() {
let user = make_user(); let user = make_user();
store.users.lock().unwrap().push(user.clone()); store.users.lock().unwrap().push(user.clone());
let big = Bytes::from(vec![0u8; 6 * 1024 * 1024]); let big = Bytes::from(vec![0u8; 6 * 1024 * 1024]);
let err = upload_avatar( let cfg = default_cfg();
&store, let ctx = make_ctx(&store, &media, &cfg);
&media, let err = upload_avatar(&ctx, &user.id, "image/jpeg", big)
&store,
&user.id,
"http://localhost",
&default_cfg(),
"image/jpeg",
big,
)
.await .await
.unwrap_err(); .unwrap_err();
assert!(matches!(err, DomainError::InvalidInput(_))); assert!(matches!(err, DomainError::InvalidInput(_)));
@@ -162,16 +162,9 @@ async fn upload_avatar_stores_file_and_updates_url() {
let media = MockMedia::default(); let media = MockMedia::default();
let user = make_user(); let user = make_user();
store.users.lock().unwrap().push(user.clone()); store.users.lock().unwrap().push(user.clone());
upload_avatar( let cfg = default_cfg();
&store, let ctx = make_ctx(&store, &media, &cfg);
&media, upload_avatar(&ctx, &user.id, "image/jpeg", Bytes::from("img"))
&store,
&user.id,
"http://localhost",
&default_cfg(),
"image/jpeg",
Bytes::from("img"),
)
.await .await
.unwrap(); .unwrap();
let key = format!("users/{}/avatar.jpg", user.id.as_uuid()); let key = format!("users/{}/avatar.jpg", user.id.as_uuid());
@@ -203,16 +196,9 @@ async fn upload_avatar_deletes_old_file_on_reupload() {
.lock() .lock()
.unwrap() .unwrap()
.insert(old_key.clone(), Bytes::from("old")); .insert(old_key.clone(), Bytes::from("old"));
upload_avatar( let cfg = default_cfg();
&store, let ctx = make_ctx(&store, &media, &cfg);
&media, upload_avatar(&ctx, &user.id, "image/jpeg", Bytes::from("new"))
&store,
&user.id,
"http://localhost",
&default_cfg(),
"image/jpeg",
Bytes::from("new"),
)
.await .await
.unwrap(); .unwrap();
assert!(!media.store.lock().unwrap().contains_key(&old_key)); assert!(!media.store.lock().unwrap().contains_key(&old_key));
@@ -225,16 +211,9 @@ async fn upload_banner_stores_file_and_updates_header_url() {
let media = MockMedia::default(); let media = MockMedia::default();
let user = make_user(); let user = make_user();
store.users.lock().unwrap().push(user.clone()); store.users.lock().unwrap().push(user.clone());
upload_banner( let cfg = default_cfg();
&store, let ctx = make_ctx(&store, &media, &cfg);
&media, upload_banner(&ctx, &user.id, "image/png", Bytes::from("banner"))
&store,
&user.id,
"http://localhost",
&default_cfg(),
"image/png",
Bytes::from("banner"),
)
.await .await
.unwrap(); .unwrap();
let key = format!("users/{}/banner.png", user.id.as_uuid()); let key = format!("users/{}/banner.png", user.id.as_uuid());
@@ -266,16 +245,9 @@ async fn upload_banner_deletes_old_file_on_reupload() {
.lock() .lock()
.unwrap() .unwrap()
.insert(old_key.clone(), Bytes::from("old")); .insert(old_key.clone(), Bytes::from("old"));
upload_banner( let cfg = default_cfg();
&store, let ctx = make_ctx(&store, &media, &cfg);
&media, upload_banner(&ctx, &user.id, "image/png", Bytes::from("new"))
&store,
&user.id,
"http://localhost",
&default_cfg(),
"image/png",
Bytes::from("new"),
)
.await .await
.unwrap(); .unwrap();
assert!(!media.store.lock().unwrap().contains_key(&old_key)); assert!(!media.store.lock().unwrap().contains_key(&old_key));

View File

@@ -8,7 +8,7 @@ use std::sync::Arc;
use application::use_cases::profile::UploadConfig; use application::use_cases::profile::UploadConfig;
use storage::{build_store, ObjectStorageAdapter, StorageConfig}; use storage::{build_store, ObjectStorageAdapter, StorageConfig};
use activitypub::{ApFederationAdapter, ThoughtsObjectHandler}; use activitypub::{build_ap_service, ApFederationAdapter, ApServiceConfig, ThoughtsObjectHandler};
use auth::ApiKeyServiceImpl; use auth::ApiKeyServiceImpl;
use domain::{ use domain::{
errors::DomainError, errors::DomainError,
@@ -16,13 +16,13 @@ use domain::{
ports::{EventPublisher, OutboxWriter}, ports::{EventPublisher, OutboxWriter},
}; };
use event_transport::{EventPublisherAdapter, Transport}; use event_transport::{EventPublisherAdapter, Transport};
use k_ap::{ActivityPubService, FederationEvent}; use k_ap::FederationEvent;
use nats::NatsTransport; use nats::NatsTransport;
use postgres::activitypub::PgActivityPubRepository; use postgres::activitypub::PgActivityPubRepository;
use postgres::engagement::PgEngagementRepository; use postgres::engagement::PgEngagementRepository;
use postgres::outbox::PgOutboxWriter; use postgres::outbox::PgOutboxWriter;
use postgres::remote_actor_connections::PgRemoteActorConnectionRepository; use postgres::remote_actor_connections::PgRemoteActorConnectionRepository;
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; use postgres_federation::{PgApUserRepository, PgFederationRepository};
use presentation::state::AppState; use presentation::state::AppState;
use crate::config::Config; use crate::config::Config;
@@ -125,7 +125,7 @@ pub async fn build(cfg: &Config) -> Infrastructure {
// 3. ActivityPub federation // 3. ActivityPub federation
let connections_repo = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())); let connections_repo = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone()));
let fed_repo = Arc::new(PostgresFederationRepository::new(pool.clone())); let fed_repo = Arc::new(PgFederationRepository::new(pool.clone()));
let likes: Arc<dyn domain::ports::LikeRepository> = let likes: Arc<dyn domain::ports::LikeRepository> =
Arc::new(postgres::like::PgLikeRepository::new(pool.clone())); Arc::new(postgres::like::PgLikeRepository::new(pool.clone()));
let boosts: Arc<dyn domain::ports::BoostRepository> = let boosts: Arc<dyn domain::ports::BoostRepository> =
@@ -138,30 +138,20 @@ pub async fn build(cfg: &Config) -> Infrastructure {
likes.clone(), likes.clone(),
boosts.clone(), boosts.clone(),
)); ));
let mut ap_builder = ActivityPubService::builder(cfg.base_url.clone()) let (_raw, ap_service) = build_ap_service(ApServiceConfig {
.activity_repo(fed_repo.clone()) base_url: cfg.base_url.clone(),
.follow_repo(fed_repo.clone()) activity_repo: fed_repo.clone(),
.actor_repo(fed_repo.clone()) follow_repo: fed_repo.clone(),
.blocklist_repo(fed_repo.clone()) actor_repo: fed_repo.clone(),
.user_repo(Arc::new(PostgresApUserRepository::new( blocklist_repo: fed_repo.clone(),
pool.clone(), user_repo: Arc::new(PgApUserRepository::new(pool.clone(), cfg.base_url.clone())),
cfg.base_url.clone(), ap_handler,
))) connections_repo,
.content_reader(ap_handler.clone()) event_publisher: kap_publisher,
.object_handler(ap_handler) allow_registration: cfg.allow_registration,
.allow_registration(cfg.allow_registration) debug: cfg.debug,
.software_name("thoughts") })
.debug(cfg.debug); .await;
if let Some(publisher) = kap_publisher {
ap_builder = ap_builder.event_publisher(publisher);
}
let raw_ap_service = Arc::new(
ap_builder
.build()
.await
.expect("Failed to build ActivityPubService"),
);
let ap_service = Arc::new(ApFederationAdapter::new(raw_ap_service, connections_repo));
// 4. Storage adapter // 4. Storage adapter
let storage_cfg = StorageConfig { let storage_cfg = StorageConfig {

View File

@@ -89,7 +89,7 @@ impl Email {
} }
} }
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PasswordHash(pub String); pub struct PasswordHash(pub String);
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]

View File

@@ -37,11 +37,13 @@ pub async fn post_api_key(
Deps(d): Deps<ApiKeysDeps>, Deps(d): Deps<ApiKeysDeps>,
AuthUser(uid): AuthUser, AuthUser(uid): AuthUser,
Json(body): Json<CreateApiKeyRequest>, Json(body): Json<CreateApiKeyRequest>,
) -> Result<Json<serde_json::Value>, ApiError> { ) -> Result<Json<CreatedApiKeyResponse>, ApiError> {
let (key, raw) = create_api_key(&*d.api_keys, &uid, body.name).await?; let (key, raw) = create_api_key(&*d.api_keys, &uid, body.name).await?;
Ok(Json( Ok(Json(CreatedApiKeyResponse {
serde_json::json!({ "id": key.id.as_uuid(), "name": key.name, "key": raw }), id: key.id.as_uuid(),
)) name: key.name,
key: raw,
}))
} }
#[utoipa::path(delete, path = "/api-keys/{id}", params(("id" = uuid::Uuid, Path, description = "Key ID")), responses((status = 204, description = "Deleted")), security(("bearer_auth" = [])))] #[utoipa::path(delete, path = "/api-keys/{id}", params(("id" = uuid::Uuid, Path, description = "Key ID")), responses((status = 204, description = "Deleted")), security(("bearer_auth" = [])))]
pub async fn delete_api_key_handler( pub async fn delete_api_key_handler(

View File

@@ -5,7 +5,7 @@ use crate::{
handlers::auth::to_user_response, handlers::auth::to_user_response,
}; };
use api_types::requests::{PaginationQuery, SearchQuery}; use api_types::requests::{PaginationQuery, SearchQuery};
use api_types::responses::ThoughtResponse; use api_types::responses::{PagedResponse, ThoughtResponse};
use application::use_cases::feed::{ use application::use_cases::feed::{
get_home_feed, get_popular_tags as uc_get_popular_tags, get_public_feed, get_tag_feed, get_home_feed, get_popular_tags as uc_get_popular_tags, get_public_feed, get_tag_feed,
get_user_feed, get_user_feed,
@@ -75,6 +75,13 @@ impl TryFrom<FeedOptionsQuery> for FeedOptions {
} }
} }
fn wants_activity_json(headers: &HeaderMap) -> bool {
headers
.get(header::ACCEPT)
.and_then(|v| v.to_str().ok())
.is_some_and(|a| a.contains("application/activity+json"))
}
deps_struct!(FeedDeps { deps_struct!(FeedDeps {
feed: FeedRepository, feed: FeedRepository,
follows: FollowRepository, follows: FollowRepository,
@@ -116,19 +123,19 @@ pub async fn home_feed(
AuthUser(uid): AuthUser, AuthUser(uid): AuthUser,
Query(q): Query<PaginationQuery>, Query(q): Query<PaginationQuery>,
Query(opts_q): Query<FeedOptionsQuery>, Query(opts_q): Query<FeedOptionsQuery>,
) -> Result<Json<serde_json::Value>, ApiError> { ) -> Result<Json<PagedResponse<ThoughtResponse>>, ApiError> {
let page = PageParams { let page = PageParams {
page: q.page(), page: q.page(),
per_page: q.per_page(), per_page: q.per_page(),
}; };
let opts = FeedOptions::try_from(opts_q)?; let opts = FeedOptions::try_from(opts_q)?;
let result = get_home_feed(&*d.feed, &*d.follows, &uid, page, opts).await?; let result = get_home_feed(&*d.feed, &*d.follows, &uid, page, opts).await?;
Ok(Json(serde_json::json!({ Ok(Json(PagedResponse {
"items": result.items.iter().map(to_thought_response).collect::<Vec<_>>(), items: result.items.iter().map(to_thought_response).collect(),
"total": result.total, total: result.total,
"page": result.page, page: result.page,
"per_page": result.per_page, per_page: result.per_page,
}))) }))
} }
#[utoipa::path( #[utoipa::path(
@@ -141,19 +148,19 @@ pub async fn public_feed(
OptionalAuthUser(viewer): OptionalAuthUser, OptionalAuthUser(viewer): OptionalAuthUser,
Query(q): Query<PaginationQuery>, Query(q): Query<PaginationQuery>,
Query(opts_q): Query<FeedOptionsQuery>, Query(opts_q): Query<FeedOptionsQuery>,
) -> Result<Json<serde_json::Value>, ApiError> { ) -> Result<Json<PagedResponse<ThoughtResponse>>, ApiError> {
let page = PageParams { let page = PageParams {
page: q.page(), page: q.page(),
per_page: q.per_page(), per_page: q.per_page(),
}; };
let opts = FeedOptions::try_from(opts_q)?; let opts = FeedOptions::try_from(opts_q)?;
let result = get_public_feed(&*d.feed, viewer, page, opts).await?; let result = get_public_feed(&*d.feed, viewer, page, opts).await?;
Ok(Json(serde_json::json!({ Ok(Json(PagedResponse {
"items": result.items.iter().map(to_thought_response).collect::<Vec<_>>(), items: result.items.iter().map(to_thought_response).collect(),
"total": result.total, total: result.total,
"page": result.page, page: result.page,
"per_page": result.per_page, per_page: result.per_page,
}))) }))
} }
#[utoipa::path( #[utoipa::path(
@@ -210,12 +217,7 @@ pub async fn get_following_handler(
Query(q): Query<PaginationQuery>, Query(q): Query<PaginationQuery>,
headers: HeaderMap, headers: HeaderMap,
) -> Result<Response, ApiError> { ) -> Result<Response, ApiError> {
let accept = headers if wants_activity_json(&headers) {
.get(header::ACCEPT)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if accept.contains("application/activity+json") {
let user = get_user_by_id_or_username(&*d.users, &param).await?; let user = get_user_by_id_or_username(&*d.users, &param).await?;
let user_id = user.id; let user_id = user.id;
let page = q.page().try_into().ok(); let page = q.page().try_into().ok();
@@ -232,10 +234,12 @@ pub async fn get_following_handler(
per_page: q.per_page(), per_page: q.per_page(),
}; };
let result = list_local_following(&*d.follows, &user.id, page).await?; let result = list_local_following(&*d.follows, &user.id, page).await?;
Ok(Json(serde_json::json!({ Ok(Json(PagedResponse {
"total": result.total, items: result.items.iter().map(to_user_response).collect(),
"items": result.items.iter().map(to_user_response).collect::<Vec<_>>() total: result.total,
})) page: result.page,
per_page: result.per_page,
})
.into_response()) .into_response())
} }
@@ -253,12 +257,7 @@ pub async fn get_followers_handler(
Query(q): Query<PaginationQuery>, Query(q): Query<PaginationQuery>,
headers: HeaderMap, headers: HeaderMap,
) -> Result<Response, ApiError> { ) -> Result<Response, ApiError> {
let accept = headers if wants_activity_json(&headers) {
.get(header::ACCEPT)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if accept.contains("application/activity+json") {
let user = get_user_by_id_or_username(&*d.users, &param).await?; let user = get_user_by_id_or_username(&*d.users, &param).await?;
let user_id = user.id; let user_id = user.id;
let page = q.page().try_into().ok(); let page = q.page().try_into().ok();
@@ -275,10 +274,12 @@ pub async fn get_followers_handler(
per_page: q.per_page(), per_page: q.per_page(),
}; };
let result = list_local_followers(&*d.follows, &user.id, page).await?; let result = list_local_followers(&*d.follows, &user.id, page).await?;
Ok(Json(serde_json::json!({ Ok(Json(PagedResponse {
"total": result.total, items: result.items.iter().map(to_user_response).collect(),
"items": result.items.iter().map(to_user_response).collect::<Vec<_>>() total: result.total,
})) page: result.page,
per_page: result.per_page,
})
.into_response()) .into_response())
} }
@@ -297,7 +298,7 @@ pub async fn user_thoughts_handler(
OptionalAuthUser(viewer): OptionalAuthUser, OptionalAuthUser(viewer): OptionalAuthUser,
Query(q): Query<PaginationQuery>, Query(q): Query<PaginationQuery>,
Query(opts_q): Query<FeedOptionsQuery>, Query(opts_q): Query<FeedOptionsQuery>,
) -> Result<Json<serde_json::Value>, ApiError> { ) -> Result<Json<PagedResponse<ThoughtResponse>>, ApiError> {
let user = get_user_by_username(&*d.users, &username).await?; let user = get_user_by_username(&*d.users, &username).await?;
let page = PageParams { let page = PageParams {
page: q.page(), page: q.page(),
@@ -305,12 +306,12 @@ pub async fn user_thoughts_handler(
}; };
let opts = FeedOptions::try_from(opts_q)?; let opts = FeedOptions::try_from(opts_q)?;
let result = get_user_feed(&*d.feed, user.id.clone(), page, opts, viewer).await?; let result = get_user_feed(&*d.feed, user.id.clone(), page, opts, viewer).await?;
Ok(Json(serde_json::json!({ Ok(Json(PagedResponse {
"total": result.total, items: result.items.iter().map(to_thought_response).collect(),
"page": result.page, total: result.total,
"per_page": result.per_page, page: result.page,
"items": result.items.iter().map(to_thought_response).collect::<Vec<_>>() per_page: result.per_page,
}))) }))
} }
#[utoipa::path( #[utoipa::path(
@@ -353,18 +354,17 @@ pub async fn tag_thoughts_handler(
OptionalAuthUser(viewer): OptionalAuthUser, OptionalAuthUser(viewer): OptionalAuthUser,
Query(q): Query<PaginationQuery>, Query(q): Query<PaginationQuery>,
Query(opts_q): Query<FeedOptionsQuery>, Query(opts_q): Query<FeedOptionsQuery>,
) -> Result<Json<serde_json::Value>, ApiError> { ) -> Result<Json<PagedResponse<ThoughtResponse>>, ApiError> {
let page = PageParams { let page = PageParams {
page: q.page(), page: q.page(),
per_page: q.per_page(), per_page: q.per_page(),
}; };
let opts = FeedOptions::try_from(opts_q)?; let opts = FeedOptions::try_from(opts_q)?;
let result = get_tag_feed(&*d.feed, &tag_name, page, opts, viewer).await?; let result = get_tag_feed(&*d.feed, &tag_name, page, opts, viewer).await?;
Ok(Json(serde_json::json!({ Ok(Json(PagedResponse {
"tag": tag_name, items: result.items.iter().map(to_thought_response).collect(),
"total": result.total, total: result.total,
"page": result.page, page: result.page,
"per_page": result.per_page, per_page: result.per_page,
"items": result.items.iter().map(to_thought_response).collect::<Vec<_>>(), }))
})))
} }

View File

@@ -3,7 +3,7 @@ use crate::{
errors::ApiError, errors::ApiError,
extractors::{AuthUser, Deps}, extractors::{AuthUser, Deps},
}; };
use api_types::requests::NotificationUpdateRequest; use api_types::{requests::NotificationUpdateRequest, responses::NotificationSummaryResponse};
use application::use_cases::notifications::{ use application::use_cases::notifications::{
count_unread_notifications, list_notifications as uc_list_notifications, count_unread_notifications, list_notifications as uc_list_notifications,
mark_all_notifications_read, mark_notification_read as uc_mark_notification_read, mark_all_notifications_read, mark_notification_read as uc_mark_notification_read,
@@ -22,17 +22,17 @@ deps_struct!(NotificationsDeps {
pub async fn list_notifications( pub async fn list_notifications(
Deps(d): Deps<NotificationsDeps>, Deps(d): Deps<NotificationsDeps>,
AuthUser(uid): AuthUser, AuthUser(uid): AuthUser,
) -> Result<Json<serde_json::Value>, ApiError> { ) -> Result<Json<NotificationSummaryResponse>, ApiError> {
let page = PageParams { let page = PageParams {
page: 1, page: 1,
per_page: 20, per_page: 20,
}; };
let result = uc_list_notifications(&*d.notifications, &uid, page).await?; let result = uc_list_notifications(&*d.notifications, &uid, page).await?;
let unread = count_unread_notifications(&*d.notifications, &uid).await?; let unread = count_unread_notifications(&*d.notifications, &uid).await?;
Ok(Json(serde_json::json!({ Ok(Json(NotificationSummaryResponse {
"total": result.total, total: result.total,
"unread": unread unread,
}))) }))
} }
#[utoipa::path(patch, path = "/notifications/{id}", params(("id" = uuid::Uuid, Path, description = "Notification ID")), request_body = NotificationUpdateRequest, responses((status = 204, description = "Marked read")), security(("bearer_auth" = [])))] #[utoipa::path(patch, path = "/notifications/{id}", params(("id" = uuid::Uuid, Path, description = "Notification ID")), request_body = NotificationUpdateRequest, responses((status = 204, description = "Marked read")), security(("bearer_auth" = [])))]

View File

@@ -6,12 +6,12 @@ use crate::{
}; };
use api_types::{ use api_types::{
requests::{PaginationQuery, UpdateProfileRequest}, requests::{PaginationQuery, UpdateProfileRequest},
responses::{ErrorResponse, ProfileField, RemoteActorResponse, UserResponse}, responses::{ErrorResponse, PagedResponse, ProfileField, RemoteActorResponse, UserResponse},
}; };
use application::use_cases::profile::{ use application::use_cases::profile::{
count_local_users, get_user as fetch_user, get_user_by_id_or_username, get_user_profile, count_local_users, get_user as fetch_user, get_user_by_id_or_username, get_user_profile,
list_local_following, list_users, update_profile, upload_avatar as upload_avatar_uc, list_local_following, list_users, update_profile, upload_avatar as upload_avatar_uc,
upload_banner as upload_banner_uc, UploadConfig, upload_banner as upload_banner_uc, UploadConfig, UploadContext,
}; };
use axum::{ use axum::{
extract::{Multipart, Path, Query}, extract::{Multipart, Path, Query},
@@ -54,6 +54,13 @@ impl FromAppState for UsersDeps {
} }
} }
fn wants_activity_json(headers: &HeaderMap) -> bool {
headers
.get(header::ACCEPT)
.and_then(|v| v.to_str().ok())
.is_some_and(|a| a.contains("application/activity+json"))
}
#[utoipa::path( #[utoipa::path(
get, path = "/users/{username}", get, path = "/users/{username}",
params(("username" = String, Path, description = "Username")), params(("username" = String, Path, description = "Username")),
@@ -68,12 +75,7 @@ pub async fn get_user(
OptionalAuthUser(viewer): OptionalAuthUser, OptionalAuthUser(viewer): OptionalAuthUser,
headers: HeaderMap, headers: HeaderMap,
) -> Result<Response, ApiError> { ) -> Result<Response, ApiError> {
let accept = headers if wants_activity_json(&headers) {
.get(header::ACCEPT)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if accept.contains("application/activity+json") {
let user = get_user_by_id_or_username(&*d.users, &username).await?; let user = get_user_by_id_or_username(&*d.users, &username).await?;
let json = d.federation.actor_json(&user.id).await?; let json = d.federation.actor_json(&user.id).await?;
Ok(([(header::CONTENT_TYPE, "application/activity+json")], json).into_response()) Ok(([(header::CONTENT_TYPE, "application/activity+json")], json).into_response())
@@ -145,17 +147,19 @@ pub async fn get_me_following(
Deps(d): Deps<UsersDeps>, Deps(d): Deps<UsersDeps>,
AuthUser(uid): AuthUser, AuthUser(uid): AuthUser,
Query(q): Query<PaginationQuery>, Query(q): Query<PaginationQuery>,
) -> Result<Json<serde_json::Value>, ApiError> { ) -> Result<Json<PagedResponse<UserResponse>>, ApiError> {
use domain::models::feed::PageParams; use domain::models::feed::PageParams;
let page = PageParams { let page = PageParams {
page: q.page(), page: q.page(),
per_page: q.per_page(), per_page: q.per_page(),
}; };
let result = list_local_following(&*d.follows, &uid, page).await?; let result = list_local_following(&*d.follows, &uid, page).await?;
Ok(Json(serde_json::json!({ Ok(Json(PagedResponse {
"total": result.total, items: result.items.iter().map(to_user_response).collect(),
"items": result.items.iter().map(to_user_response).collect::<Vec<_>>(), total: result.total,
}))) page: result.page,
per_page: result.per_page,
}))
} }
#[utoipa::path( #[utoipa::path(
@@ -170,7 +174,7 @@ pub async fn get_me_following(
pub async fn get_users( pub async fn get_users(
Deps(d): Deps<UsersDeps>, Deps(d): Deps<UsersDeps>,
Query(params): Query<std::collections::HashMap<String, String>>, Query(params): Query<std::collections::HashMap<String, String>>,
) -> Result<Json<serde_json::Value>, ApiError> { ) -> Result<Json<PagedResponse<UserResponse>>, ApiError> {
use domain::models::feed::PageParams; use domain::models::feed::PageParams;
let page = params let page = params
.get("page") .get("page")
@@ -184,38 +188,37 @@ pub async fn get_users(
if let Some(q) = params.get("q").filter(|q| !q.trim().is_empty()) { if let Some(q) = params.get("q").filter(|q| !q.trim().is_empty()) {
let result = d.search.search_users(q, &page_params).await?; let result = d.search.search_users(q, &page_params).await?;
let users: Vec<_> = result return Ok(Json(PagedResponse {
.items items: result.items.iter().map(to_user_response).collect(),
.iter() total: result.total,
.map(crate::handlers::auth::to_user_response) page: result.page,
.collect(); per_page: result.per_page,
return Ok(Json(serde_json::json!({ }));
"items": users, "total": result.total, "page": result.page, "per_page": result.per_page
})));
} }
let result = list_users(&*d.users, page_params).await?; let result = list_users(&*d.users, page_params).await?;
let items: Vec<_> = result let items: Vec<UserResponse> = result
.items .items
.iter() .iter()
.map(|u| { .map(|u| UserResponse {
serde_json::json!({ id: u.id.as_uuid(),
"id": u.id.as_uuid(), username: u.username.clone(),
"username": u.username, display_name: u.display_name.clone(),
"displayName": u.display_name, bio: u.bio.clone(),
"avatarUrl": u.avatar_url, avatar_url: u.avatar_url.clone(),
"bio": u.bio, header_url: None,
"headerUrl": null, custom_css: None,
"customCss": null, local: true,
"local": true, is_followed_by_viewer: false,
"isFollowedByViewer": false, created_at: chrono::Utc::now(),
"joinedAt": null,
})
}) })
.collect(); .collect();
Ok(Json(serde_json::json!({ Ok(Json(PagedResponse {
"items": items, "total": result.total, "page": result.page, "per_page": result.per_page items,
}))) total: result.total,
page: result.page,
per_page: result.per_page,
}))
} }
#[utoipa::path( #[utoipa::path(
@@ -266,6 +269,25 @@ pub async fn lookup_handler(
})) }))
} }
async fn extract_upload_field(
mut multipart: Multipart,
) -> Result<(String, axum::body::Bytes), ApiError> {
let field = multipart
.next_field()
.await
.map_err(|_| ApiError::BadRequest("invalid multipart".into()))?
.ok_or_else(|| ApiError::BadRequest("no file field".into()))?;
let content_type = field
.content_type()
.ok_or_else(|| ApiError::BadRequest("missing content-type on field".into()))?
.to_string();
let data = field
.bytes()
.await
.map_err(|_| ApiError::BadRequest("failed to read upload".into()))?;
Ok((content_type, data))
}
#[utoipa::path( #[utoipa::path(
put, path = "/users/me/avatar", put, path = "/users/me/avatar",
request_body(content = String, content_type = "multipart/form-data", description = "Image file (JPEG, PNG, WebP, AVIF, GIF)"), request_body(content = String, content_type = "multipart/form-data", description = "Image file (JPEG, PNG, WebP, AVIF, GIF)"),
@@ -278,35 +300,17 @@ pub async fn lookup_handler(
pub async fn upload_avatar( pub async fn upload_avatar(
Deps(d): Deps<UsersDeps>, Deps(d): Deps<UsersDeps>,
AuthUser(uid): AuthUser, AuthUser(uid): AuthUser,
mut multipart: Multipart, multipart: Multipart,
) -> Result<Json<UserResponse>, ApiError> { ) -> Result<Json<UserResponse>, ApiError> {
let field = multipart let (content_type, data) = extract_upload_field(multipart).await?;
.next_field() let ctx = UploadContext {
.await users: &*d.users,
.map_err(|_| ApiError::BadRequest("invalid multipart".into()))? media: &*d.media,
.ok_or_else(|| ApiError::BadRequest("no file field".into()))?; events: &*d.events,
// Content-type is client-supplied; the use-case allowlist prevents obviously upload_config: &d.upload_config,
// wrong types, but magic-byte validation is not performed. Serve media files base_url: &d.base_url,
// from an isolated origin to prevent MIME-based XSS. };
let content_type = field upload_avatar_uc(&ctx, &uid, &content_type, data).await?;
.content_type()
.ok_or_else(|| ApiError::BadRequest("missing content-type on field".into()))?
.to_string();
let data = field
.bytes()
.await
.map_err(|_| ApiError::BadRequest("failed to read upload".into()))?;
upload_avatar_uc(
&*d.users,
&*d.media,
&*d.events,
&uid,
&d.base_url,
&d.upload_config,
&content_type,
data,
)
.await?;
let user = fetch_user(&*d.users, &uid).await?; let user = fetch_user(&*d.users, &uid).await?;
Ok(Json(to_user_response(&user))) Ok(Json(to_user_response(&user)))
} }
@@ -323,35 +327,17 @@ pub async fn upload_avatar(
pub async fn upload_banner( pub async fn upload_banner(
Deps(d): Deps<UsersDeps>, Deps(d): Deps<UsersDeps>,
AuthUser(uid): AuthUser, AuthUser(uid): AuthUser,
mut multipart: Multipart, multipart: Multipart,
) -> Result<Json<UserResponse>, ApiError> { ) -> Result<Json<UserResponse>, ApiError> {
let field = multipart let (content_type, data) = extract_upload_field(multipart).await?;
.next_field() let ctx = UploadContext {
.await users: &*d.users,
.map_err(|_| ApiError::BadRequest("invalid multipart".into()))? media: &*d.media,
.ok_or_else(|| ApiError::BadRequest("no file field".into()))?; events: &*d.events,
// Content-type is client-supplied; the use-case allowlist prevents obviously upload_config: &d.upload_config,
// wrong types, but magic-byte validation is not performed. Serve media files base_url: &d.base_url,
// from an isolated origin to prevent MIME-based XSS. };
let content_type = field upload_banner_uc(&ctx, &uid, &content_type, data).await?;
.content_type()
.ok_or_else(|| ApiError::BadRequest("missing content-type on field".into()))?
.to_string();
let data = field
.bytes()
.await
.map_err(|_| ApiError::BadRequest("failed to read upload".into()))?;
upload_banner_uc(
&*d.users,
&*d.media,
&*d.events,
&uid,
&d.base_url,
&d.upload_config,
&content_type,
data,
)
.await?;
let user = fetch_user(&*d.users, &uid).await?; let user = fetch_user(&*d.users, &uid).await?;
Ok(Json(to_user_response(&user))) Ok(Json(to_user_response(&user)))
} }

View File

@@ -31,73 +31,60 @@ impl PasswordHasher for NoOpHasher {
} }
} }
/// No-op ActivityPubRepository for presentation layer tests.
pub struct NoOpApRepo; pub struct NoOpApRepo;
#[async_trait] #[async_trait]
impl ActivityPubRepository for NoOpApRepo { impl ActivityPubRepository for NoOpApRepo {
async fn outbox_entries_for_actor( async fn outbox_entries_for_actor(&self, _: &UserId) -> Result<Vec<OutboxEntry>, DomainError> {
&self,
_uid: &UserId,
) -> Result<Vec<OutboxEntry>, DomainError> {
Ok(vec![]) Ok(vec![])
} }
async fn outbox_page_for_actor( async fn outbox_page_for_actor(
&self, &self,
_uid: &UserId, _: &UserId,
_before: Option<chrono::DateTime<chrono::Utc>>, _: Option<chrono::DateTime<chrono::Utc>>,
_limit: usize, _: usize,
) -> Result<Vec<OutboxEntry>, DomainError> { ) -> Result<Vec<OutboxEntry>, DomainError> {
Ok(vec![]) Ok(vec![])
} }
async fn find_remote_actor_id( async fn find_remote_actor_id(&self, _: &str) -> Result<Option<UserId>, DomainError> {
&self,
_actor_ap_url: &str,
) -> Result<Option<UserId>, DomainError> {
Ok(None) Ok(None)
} }
async fn intern_remote_actor(&self, _actor_ap_url: &str) -> Result<UserId, DomainError> { async fn intern_remote_actor(&self, _: &str) -> Result<UserId, DomainError> {
Err(DomainError::NotFound) Err(DomainError::NotFound)
} }
async fn update_remote_actor_display( async fn update_remote_actor_display(
&self, &self,
_user_id: &UserId, _: &UserId,
_display_name: Option<&str>, _: Option<&str>,
_avatar_url: Option<&str>, _: Option<&str>,
) -> Result<(), DomainError> { ) -> Result<(), DomainError> {
Ok(()) Ok(())
} }
async fn accept_note( async fn accept_note(
&self, &self,
_input: activitypub::AcceptNoteInput<'_>, _: activitypub::AcceptNoteInput<'_>,
) -> Result<ThoughtId, DomainError> { ) -> Result<ThoughtId, DomainError> {
Ok(ThoughtId::from_uuid(uuid::Uuid::new_v4())) Ok(ThoughtId::from_uuid(uuid::Uuid::new_v4()))
} }
async fn apply_note_update(&self, _ap_id: &str, _new_content: &str) -> Result<(), DomainError> { async fn apply_note_update(&self, _: &str, _: &str) -> Result<(), DomainError> {
Ok(()) Ok(())
} }
async fn retract_note(&self, _ap_id: &str) -> Result<(), DomainError> { async fn retract_note(&self, _: &str) -> Result<(), DomainError> {
Ok(()) Ok(())
} }
async fn retract_actor_notes(&self, _actor_ap_url: &str) -> Result<(), DomainError> { async fn retract_actor_notes(&self, _: &str) -> Result<(), DomainError> {
Ok(()) Ok(())
} }
async fn count_local_notes(&self) -> Result<u64, DomainError> { async fn count_local_notes(&self) -> Result<u64, DomainError> {
Ok(0) Ok(0)
} }
async fn get_thought_ap_id( async fn get_thought_ap_id(&self, _: &ThoughtId) -> Result<Option<String>, DomainError> {
&self,
_thought_id: &ThoughtId,
) -> Result<Option<String>, DomainError> {
Ok(None) Ok(None)
} }
async fn get_actor_ap_urls( async fn get_actor_ap_urls(&self, _: &UserId) -> Result<Option<ActorApUrls>, DomainError> {
&self,
_user_id: &UserId,
) -> Result<Option<ActorApUrls>, DomainError> {
Ok(None) Ok(None)
} }
async fn sync_remote_actor_to_user(&self, _actor_ap_url: &str) -> Result<(), DomainError> { async fn sync_remote_actor_to_user(&self, _: &str) -> Result<(), DomainError> {
Ok(()) Ok(())
} }
} }

View File

@@ -3,15 +3,16 @@ use postgres::remote_actor_connections::PgRemoteActorConnectionRepository;
use sqlx::PgPool; use sqlx::PgPool;
use std::sync::Arc; use std::sync::Arc;
use activitypub::{ActivityPubRepository, OutboundFederationPort}; use activitypub::ThoughtsObjectHandler;
use activitypub::{ApFederationAdapter, ThoughtsObjectHandler}; use activitypub::{
build_ap_service, ActivityPubRepository, ApServiceConfig, OutboundFederationPort,
};
use application::services::{ use application::services::{
FederationEventService, FederationManagementEventService, NotificationEventService, FederationEventService, FederationManagementEventService, NotificationEventService,
}; };
use domain::ports::EventPublisher; use domain::ports::EventPublisher;
use k_ap::ActivityPubService;
use postgres::activitypub::PgActivityPubRepository; use postgres::activitypub::PgActivityPubRepository;
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; use postgres_federation::{PgApUserRepository, PgFederationRepository};
use crate::handlers::{FederationHandler, FederationManagementHandler, NotificationHandler}; use crate::handlers::{FederationHandler, FederationManagementHandler, NotificationHandler};
@@ -44,7 +45,7 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
// ActivityPub service (for federation fan-out) // ActivityPub service (for federation fan-out)
let connections_repo_worker = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())); let connections_repo_worker = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone()));
let fed_repo_worker = Arc::new(PostgresFederationRepository::new(pool.clone())); let fed_repo_worker = Arc::new(PgFederationRepository::new(pool.clone()));
let ap_handler_worker = Arc::new(ThoughtsObjectHandler::new( let ap_handler_worker = Arc::new(ThoughtsObjectHandler::new(
Arc::new(PgActivityPubRepository::new(pool.clone())), Arc::new(PgActivityPubRepository::new(pool.clone())),
base_url, base_url,
@@ -53,27 +54,20 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
Arc::new(postgres::like::PgLikeRepository::new(pool.clone())), Arc::new(postgres::like::PgLikeRepository::new(pool.clone())),
Arc::new(postgres::boost::PgBoostRepository::new(pool.clone())), Arc::new(postgres::boost::PgBoostRepository::new(pool.clone())),
)); ));
let raw_ap_service = Arc::new( let (raw_ap_service, ap_service) = build_ap_service(ApServiceConfig {
ActivityPubService::builder(base_url.to_string()) base_url: base_url.to_string(),
.activity_repo(fed_repo_worker.clone()) activity_repo: fed_repo_worker.clone(),
.follow_repo(fed_repo_worker.clone()) follow_repo: fed_repo_worker.clone(),
.actor_repo(fed_repo_worker.clone()) actor_repo: fed_repo_worker.clone(),
.blocklist_repo(fed_repo_worker.clone()) blocklist_repo: fed_repo_worker.clone(),
.user_repo(Arc::new(PostgresApUserRepository::new( user_repo: Arc::new(PgApUserRepository::new(pool.clone(), base_url.to_string())),
pool.clone(), ap_handler: ap_handler_worker,
base_url.to_string(), connections_repo: connections_repo_worker,
))) event_publisher: None,
.content_reader(ap_handler_worker.clone()) allow_registration: false,
.object_handler(ap_handler_worker) debug: false,
.software_name("thoughts") })
.build() .await;
.await
.expect("ActivityPubService build failed"),
);
let ap_service = Arc::new(ApFederationAdapter::new(
raw_ap_service.clone(),
connections_repo_worker,
));
let ap_outbound = ap_service.clone() as Arc<dyn OutboundFederationPort>; let ap_outbound = ap_service.clone() as Arc<dyn OutboundFederationPort>;
let ap_repo_worker = let ap_repo_worker =
Arc::new(PgActivityPubRepository::new(pool.clone())) as Arc<dyn ActivityPubRepository>; Arc::new(PgActivityPubRepository::new(pool.clone())) as Arc<dyn ActivityPubRepository>;