diff --git a/crates/adapters/postgres/src/api_key.rs b/crates/adapters/postgres/src/api_key.rs index 5c0089f..df9054e 100644 --- a/crates/adapters/postgres/src/api_key.rs +++ b/crates/adapters/postgres/src/api_key.rs @@ -1,2 +1,73 @@ -pub struct PgApiKeyRepository { _pool: sqlx::PgPool } -impl PgApiKeyRepository { pub fn new(pool: sqlx::PgPool) -> Self { Self { _pool: pool } } } +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use sqlx::PgPool; +use domain::{errors::DomainError, models::api_key::ApiKey, ports::ApiKeyRepository, value_objects::{ApiKeyId, UserId}}; + +pub struct PgApiKeyRepository { pool: PgPool } +impl PgApiKeyRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } + +#[async_trait] +impl ApiKeyRepository for PgApiKeyRepository { + async fn save(&self, k: &ApiKey) -> Result<(), DomainError> { + sqlx::query("INSERT INTO api_keys(id,user_id,key_hash,name,created_at) VALUES($1,$2,$3,$4,$5)") + .bind(k.id.as_uuid()).bind(k.user_id.as_uuid()).bind(&k.key_hash).bind(&k.name).bind(k.created_at) + .execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ()) + } + + async fn find_by_hash(&self, hash: &str) -> Result, DomainError> { + #[derive(sqlx::FromRow)] struct Row { id: uuid::Uuid, user_id: uuid::Uuid, key_hash: String, name: String, created_at: DateTime } + sqlx::query_as::<_, Row>("SELECT id,user_id,key_hash,name,created_at FROM api_keys WHERE key_hash=$1") + .bind(hash).fetch_optional(&self.pool).await + .map_err(|e| DomainError::Internal(e.to_string())) + .map(|o| o.map(|r| ApiKey { id: ApiKeyId::from_uuid(r.id), user_id: UserId::from_uuid(r.user_id), key_hash: r.key_hash, name: r.name, created_at: r.created_at })) + } + + async fn list_for_user(&self, user_id: &UserId) -> Result, DomainError> { + #[derive(sqlx::FromRow)] struct Row { id: uuid::Uuid, user_id: uuid::Uuid, key_hash: String, name: String, created_at: DateTime } + sqlx::query_as::<_, Row>("SELECT id,user_id,key_hash,name,created_at FROM api_keys WHERE user_id=$1 ORDER BY created_at DESC") + .bind(user_id.as_uuid()).fetch_all(&self.pool).await + .map_err(|e| DomainError::Internal(e.to_string())) + .map(|rows| rows.into_iter().map(|r| ApiKey { id: ApiKeyId::from_uuid(r.id), user_id: UserId::from_uuid(r.user_id), key_hash: r.key_hash, name: r.name, created_at: r.created_at }).collect()) + } + + async fn delete(&self, id: &ApiKeyId, user_id: &UserId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM api_keys WHERE id=$1 AND user_id=$2") + .bind(id.as_uuid()).bind(user_id.as_uuid()) + .execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + use domain::{models::user::User, value_objects::*}; + use crate::user::PgUserRepository; + use domain::ports::UserRepository; + + async fn seed_user(pool: &sqlx::PgPool) -> User { + let repo = PgUserRepository::new(pool.clone()); + let u = User::new_local(UserId::new(), Username::new("alice").unwrap(), Email::new("alice@ex.com").unwrap(), PasswordHash("h".into())); + repo.save(&u).await.unwrap(); u + } + + #[sqlx::test(migrations = "./migrations")] + async fn save_and_find_by_hash(pool: sqlx::PgPool) { + let user = seed_user(&pool).await; + let repo = PgApiKeyRepository::new(pool); + let key = ApiKey { id: ApiKeyId::new(), user_id: user.id.clone(), key_hash: "abc123".into(), name: "test".into(), created_at: Utc::now() }; + repo.save(&key).await.unwrap(); + let found = repo.find_by_hash("abc123").await.unwrap().unwrap(); + assert_eq!(found.name, "test"); + } + + #[sqlx::test(migrations = "./migrations")] + async fn delete_key(pool: sqlx::PgPool) { + let user = seed_user(&pool).await; + let repo = PgApiKeyRepository::new(pool); + let key = ApiKey { id: ApiKeyId::new(), user_id: user.id.clone(), key_hash: "def456".into(), name: "key2".into(), created_at: Utc::now() }; + repo.save(&key).await.unwrap(); + repo.delete(&key.id, &user.id).await.unwrap(); + assert!(repo.find_by_hash("def456").await.unwrap().is_none()); + } +} diff --git a/crates/adapters/postgres/src/feed.rs b/crates/adapters/postgres/src/feed.rs index 0f7e5ff..2f5e0d8 100644 --- a/crates/adapters/postgres/src/feed.rs +++ b/crates/adapters/postgres/src/feed.rs @@ -1,2 +1,173 @@ -pub struct PgFeedRepository { _pool: sqlx::PgPool } -impl PgFeedRepository { pub fn new(pool: sqlx::PgPool) -> Self { Self { _pool: pool } } } +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use sqlx::PgPool; +use domain::{ + errors::DomainError, + models::{feed::{FeedEntry, PageParams, Paginated}, thought::Thought, user::User}, + ports::FeedRepository, + value_objects::{Content, Email, PasswordHash, ThoughtId, UserId, Username}, +}; +use domain::models::thought::Visibility; + +pub struct PgFeedRepository { pool: PgPool } +impl PgFeedRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } + +#[derive(sqlx::FromRow)] +struct FeedRow { + thought_id: uuid::Uuid, + t_user_id: uuid::Uuid, + content: String, + in_reply_to_id: Option, + in_reply_to_url: Option, + t_ap_id: Option, + visibility: String, + content_warning: Option, + sensitive: bool, + t_local: bool, + thought_created_at: DateTime, + updated_at: Option>, + author_id: uuid::Uuid, + username: String, + email: String, + password_hash: String, + display_name: Option, + bio: Option, + avatar_url: Option, + header_url: Option, + custom_css: Option, + author_local: bool, + u_ap_id: Option, + inbox_url: Option, + public_key: Option, + private_key: Option, + author_created_at: DateTime, + author_updated_at: DateTime, + like_count: i64, + boost_count: i64, + reply_count: i64, +} + +const FEED_SELECT: &str = " + SELECT + t.id AS thought_id, t.user_id AS t_user_id, t.content, + t.in_reply_to_id, t.in_reply_to_url, t.ap_id AS t_ap_id, + t.visibility, t.content_warning, t.sensitive, t.local AS t_local, + t.created_at AS thought_created_at, t.updated_at, + u.id AS author_id, u.username, u.email, u.password_hash, + u.display_name, u.bio, u.avatar_url, u.header_url, u.custom_css, + u.local AS author_local, u.ap_id AS u_ap_id, u.inbox_url, + u.public_key, u.private_key, + u.created_at AS author_created_at, u.updated_at AS author_updated_at, + (SELECT COUNT(*) FROM likes l WHERE l.thought_id=t.id) AS like_count, + (SELECT COUNT(*) FROM boosts b WHERE b.thought_id=t.id) AS boost_count, + (SELECT COUNT(*) FROM thoughts r WHERE r.in_reply_to_id=t.id) AS reply_count + FROM thoughts t JOIN users u ON u.id=t.user_id"; + +fn row_to_entry(r: FeedRow) -> FeedEntry { + let thought = Thought { + id: ThoughtId::from_uuid(r.thought_id), + user_id: UserId::from_uuid(r.t_user_id), + content: Content::new_remote(r.content), + in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid), + in_reply_to_url: r.in_reply_to_url, + ap_id: r.t_ap_id, + visibility: Visibility::from_str(&r.visibility), + content_warning: r.content_warning, + sensitive: r.sensitive, + local: r.t_local, + created_at: r.thought_created_at, + updated_at: r.updated_at, + }; + let author = User { + id: UserId::from_uuid(r.author_id), + username: Username::from_trusted(r.username), + email: Email::from_trusted(r.email), + password_hash: PasswordHash(r.password_hash), + display_name: r.display_name, bio: r.bio, + avatar_url: r.avatar_url, header_url: r.header_url, custom_css: r.custom_css, + local: r.author_local, ap_id: r.u_ap_id, inbox_url: r.inbox_url, + public_key: r.public_key, private_key: r.private_key, + created_at: r.author_created_at, updated_at: r.author_updated_at, + }; + FeedEntry { thought, author, like_count: r.like_count, boost_count: r.boost_count, reply_count: r.reply_count, liked_by_viewer: false, boosted_by_viewer: false } +} + +#[async_trait] +impl FeedRepository for PgFeedRepository { + async fn home_feed(&self, following_ids: &[UserId], page: &PageParams, _viewer_id: Option<&UserId>) -> Result, DomainError> { + let ids: Vec = following_ids.iter().map(|id| id.as_uuid()).collect(); + let total: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM thoughts t WHERE t.user_id=ANY($1) AND t.visibility='public'" + ).bind(&ids).fetch_one(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + + let sql = format!("{FEED_SELECT} WHERE t.user_id=ANY($1) AND t.visibility='public' ORDER BY t.created_at DESC LIMIT $2 OFFSET $3"); + let rows = sqlx::query_as::<_, FeedRow>(&sql) + .bind(&ids).bind(page.limit()).bind(page.offset()) + .fetch_all(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(Paginated { items: rows.into_iter().map(row_to_entry).collect(), total, page: page.page, per_page: page.per_page }) + } + + async fn public_feed(&self, page: &PageParams, _viewer_id: Option<&UserId>) -> Result, DomainError> { + let total: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM thoughts t WHERE t.local=true AND t.visibility='public'" + ).fetch_one(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + + let sql = format!("{FEED_SELECT} WHERE t.local=true AND t.visibility='public' ORDER BY t.created_at DESC LIMIT $1 OFFSET $2"); + let rows = sqlx::query_as::<_, FeedRow>(&sql) + .bind(page.limit()).bind(page.offset()) + .fetch_all(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(Paginated { items: rows.into_iter().map(row_to_entry).collect(), total, page: page.page, per_page: page.per_page }) + } + + async fn search(&self, query: &str, page: &PageParams, _viewer_id: Option<&UserId>) -> Result, DomainError> { + let pattern = format!("%{}%", query.replace('%', "\\%").replace('_', "\\_")); + let total: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM thoughts t WHERE t.content ILIKE $1 AND t.visibility='public'" + ).bind(&pattern).fetch_one(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + + let sql = format!("{FEED_SELECT} WHERE t.content ILIKE $1 AND t.visibility='public' ORDER BY t.created_at DESC LIMIT $2 OFFSET $3"); + let rows = sqlx::query_as::<_, FeedRow>(&sql) + .bind(&pattern).bind(page.limit()).bind(page.offset()) + .fetch_all(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(Paginated { items: rows.into_iter().map(row_to_entry).collect(), total, page: page.page, per_page: page.per_page }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use domain::{models::{thought::{Thought, Visibility}, user::User}, ports::{ThoughtRepository, UserRepository}, value_objects::*}; + use crate::{thought::PgThoughtRepository, user::PgUserRepository}; + + async fn seed(pool: &sqlx::PgPool, username: &str, content: &str) -> (User, Thought) { + let urepo = PgUserRepository::new(pool.clone()); + let trepo = PgThoughtRepository::new(pool.clone()); + let u = User::new_local(UserId::new(), Username::new(username).unwrap(), Email::new(format!("{username}@ex.com")).unwrap(), PasswordHash("h".into())); + urepo.save(&u).await.unwrap(); + let t = Thought::new_local(ThoughtId::new(), u.id.clone(), Content::new_local(content).unwrap(), None, Visibility::Public, None, false); + trepo.save(&t).await.unwrap(); + (u, t) + } + + #[sqlx::test(migrations = "./migrations")] + async fn public_feed_returns_local_thoughts(pool: sqlx::PgPool) { + let (_, _) = seed(&pool, "alice", "hello").await; + let repo = PgFeedRepository::new(pool); + let result = repo.public_feed(&PageParams { page: 1, per_page: 20 }, None).await.unwrap(); + assert_eq!(result.total, 1); + assert_eq!(result.items[0].thought.content.as_str(), "hello"); + } + + #[sqlx::test(migrations = "./migrations")] + async fn search_returns_matching_thoughts(pool: sqlx::PgPool) { + let (_, _) = seed(&pool, "alice", "hello world").await; + let (_, _) = seed(&pool, "bob", "goodbye world").await; + let repo = PgFeedRepository::new(pool); + let result = repo.search("hello", &PageParams { page: 1, per_page: 20 }, None).await.unwrap(); + assert_eq!(result.total, 1); + assert_eq!(result.items[0].thought.content.as_str(), "hello world"); + } +} diff --git a/crates/adapters/postgres/src/notification.rs b/crates/adapters/postgres/src/notification.rs index 96e20e9..4a13069 100644 --- a/crates/adapters/postgres/src/notification.rs +++ b/crates/adapters/postgres/src/notification.rs @@ -1,2 +1,91 @@ -pub struct PgNotificationRepository { _pool: sqlx::PgPool } -impl PgNotificationRepository { pub fn new(pool: sqlx::PgPool) -> Self { Self { _pool: pool } } } +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use sqlx::PgPool; +use domain::{errors::DomainError, models::{feed::{PageParams, Paginated}, notification::{Notification, NotificationType}}, ports::NotificationRepository, value_objects::{NotificationId, ThoughtId, UserId}}; + +pub struct PgNotificationRepository { pool: PgPool } +impl PgNotificationRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } + +#[async_trait] +impl NotificationRepository for PgNotificationRepository { + async fn save(&self, n: &Notification) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO notifications(id,user_id,type,from_user_id,thought_id,read,created_at) VALUES($1,$2,$3,$4,$5,$6,$7)" + ) + .bind(n.id.as_uuid()).bind(n.user_id.as_uuid()).bind(n.notification_type.as_str()) + .bind(n.from_user_id.as_ref().map(|u| u.as_uuid())) + .bind(n.thought_id.as_ref().map(|t| t.as_uuid())) + .bind(n.read).bind(n.created_at) + .execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ()) + } + + async fn list_for_user(&self, user_id: &UserId, page: &PageParams) -> Result, DomainError> { + let total: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM notifications WHERE user_id=$1") + .bind(user_id.as_uuid()).fetch_one(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + #[derive(sqlx::FromRow)] + struct Row { id: uuid::Uuid, user_id: uuid::Uuid, r#type: String, from_user_id: Option, thought_id: Option, read: bool, created_at: DateTime } + let rows = sqlx::query_as::<_, Row>( + "SELECT id,user_id,type,from_user_id,thought_id,read,created_at FROM notifications WHERE user_id=$1 ORDER BY created_at DESC LIMIT $2 OFFSET $3" + ).bind(user_id.as_uuid()).bind(page.limit()).bind(page.offset()) + .fetch_all(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + let items = rows.into_iter().map(|r| Notification { + id: NotificationId::from_uuid(r.id), user_id: UserId::from_uuid(r.user_id), + notification_type: NotificationType::from_str(&r.r#type), + from_user_id: r.from_user_id.map(UserId::from_uuid), + thought_id: r.thought_id.map(ThoughtId::from_uuid), + read: r.read, created_at: r.created_at, + }).collect(); + Ok(Paginated { items, total, page: page.page, per_page: page.per_page }) + } + + async fn mark_read(&self, id: &NotificationId, user_id: &UserId) -> Result<(), DomainError> { + sqlx::query("UPDATE notifications SET read=true WHERE id=$1 AND user_id=$2") + .bind(id.as_uuid()).bind(user_id.as_uuid()) + .execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ()) + } + + async fn mark_all_read(&self, user_id: &UserId) -> Result<(), DomainError> { + sqlx::query("UPDATE notifications SET read=true WHERE user_id=$1") + .bind(user_id.as_uuid()) + .execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + use domain::{models::{notification::NotificationType, user::User}, value_objects::*}; + use crate::user::PgUserRepository; + use domain::ports::UserRepository; + + async fn seed_user(pool: &sqlx::PgPool) -> User { + let repo = PgUserRepository::new(pool.clone()); + let u = User::new_local(UserId::new(), Username::new("alice").unwrap(), Email::new("alice@ex.com").unwrap(), PasswordHash("h".into())); + repo.save(&u).await.unwrap(); u + } + + #[sqlx::test(migrations = "./migrations")] + async fn save_and_list(pool: sqlx::PgPool) { + let user = seed_user(&pool).await; + let repo = PgNotificationRepository::new(pool); + use domain::models::feed::PageParams; + let n = Notification { id: NotificationId::new(), user_id: user.id.clone(), notification_type: NotificationType::Like, from_user_id: None, thought_id: None, read: false, created_at: Utc::now() }; + repo.save(&n).await.unwrap(); + let page = repo.list_for_user(&user.id, &PageParams { page: 1, per_page: 20 }).await.unwrap(); + assert_eq!(page.total, 1); + assert!(!page.items[0].read); + } + + #[sqlx::test(migrations = "./migrations")] + async fn mark_all_read(pool: sqlx::PgPool) { + let user = seed_user(&pool).await; + let repo = PgNotificationRepository::new(pool); + use domain::models::feed::PageParams; + let n = Notification { id: NotificationId::new(), user_id: user.id.clone(), notification_type: NotificationType::Follow, from_user_id: None, thought_id: None, read: false, created_at: Utc::now() }; + repo.save(&n).await.unwrap(); + repo.mark_all_read(&user.id).await.unwrap(); + let page = repo.list_for_user(&user.id, &PageParams { page: 1, per_page: 20 }).await.unwrap(); + assert!(page.items[0].read); + } +} diff --git a/crates/adapters/postgres/src/remote_actor.rs b/crates/adapters/postgres/src/remote_actor.rs index 90fe94a..4fe912e 100644 --- a/crates/adapters/postgres/src/remote_actor.rs +++ b/crates/adapters/postgres/src/remote_actor.rs @@ -1,2 +1,33 @@ -pub struct PgRemoteActorRepository { _pool: sqlx::PgPool } -impl PgRemoteActorRepository { pub fn new(pool: sqlx::PgPool) -> Self { Self { _pool: pool } } } +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use sqlx::PgPool; +use domain::{errors::DomainError, models::remote_actor::RemoteActor, ports::RemoteActorRepository}; + +pub struct PgRemoteActorRepository { pool: PgPool } +impl PgRemoteActorRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } + +#[async_trait] +impl RemoteActorRepository for PgRemoteActorRepository { + async fn upsert(&self, a: &RemoteActor) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO remote_actors(url,handle,display_name,inbox_url,shared_inbox_url,public_key,last_fetched_at) + VALUES($1,$2,$3,$4,$5,$6,$7) + ON CONFLICT(url) DO UPDATE SET handle=EXCLUDED.handle,display_name=EXCLUDED.display_name, + inbox_url=EXCLUDED.inbox_url,shared_inbox_url=EXCLUDED.shared_inbox_url, + public_key=EXCLUDED.public_key,last_fetched_at=EXCLUDED.last_fetched_at" + ) + .bind(&a.url).bind(&a.handle).bind(&a.display_name).bind(&a.inbox_url) + .bind(&a.shared_inbox_url).bind(&a.public_key).bind(a.last_fetched_at) + .execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ()) + } + + async fn find_by_url(&self, url: &str) -> Result, DomainError> { + #[derive(sqlx::FromRow)] + struct Row { url: String, handle: String, display_name: Option, inbox_url: String, shared_inbox_url: Option, public_key: String, last_fetched_at: DateTime } + sqlx::query_as::<_, Row>( + "SELECT url,handle,display_name,inbox_url,shared_inbox_url,public_key,last_fetched_at FROM remote_actors WHERE url=$1" + ).bind(url).fetch_optional(&self.pool).await + .map_err(|e| DomainError::Internal(e.to_string())) + .map(|o| o.map(|r| RemoteActor { url: r.url, handle: r.handle, display_name: r.display_name, inbox_url: r.inbox_url, shared_inbox_url: r.shared_inbox_url, public_key: r.public_key, last_fetched_at: r.last_fetched_at })) + } +} diff --git a/crates/adapters/postgres/src/tag.rs b/crates/adapters/postgres/src/tag.rs index c78c388..3beca5d 100644 --- a/crates/adapters/postgres/src/tag.rs +++ b/crates/adapters/postgres/src/tag.rs @@ -1,2 +1,87 @@ -pub struct PgTagRepository { _pool: sqlx::PgPool } -impl PgTagRepository { pub fn new(pool: sqlx::PgPool) -> Self { Self { _pool: pool } } } +use async_trait::async_trait; +use sqlx::PgPool; +use domain::{errors::DomainError, models::{feed::{PageParams, Paginated}, tag::Tag, thought::Thought}, ports::TagRepository, value_objects::ThoughtId}; + +pub struct PgTagRepository { pool: PgPool } +impl PgTagRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } + +#[async_trait] +impl TagRepository for PgTagRepository { + async fn find_or_create(&self, name: &str) -> Result { + let name = name.to_lowercase(); + sqlx::query("INSERT INTO tags(name) VALUES($1) ON CONFLICT(name) DO NOTHING") + .bind(&name).execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + #[derive(sqlx::FromRow)] struct Row { id: i32, name: String } + let row = sqlx::query_as::<_, Row>("SELECT id,name FROM tags WHERE name=$1").bind(&name) + .fetch_one(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(Tag { id: row.id, name: row.name }) + } + + async fn attach_to_thought(&self, thought_id: &ThoughtId, tag_id: i32) -> Result<(), DomainError> { + sqlx::query("INSERT INTO thought_tags(thought_id,tag_id) VALUES($1,$2) ON CONFLICT DO NOTHING") + .bind(thought_id.as_uuid()).bind(tag_id) + .execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ()) + } + + async fn detach_from_thought(&self, thought_id: &ThoughtId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM thought_tags WHERE thought_id=$1").bind(thought_id.as_uuid()) + .execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ()) + } + + async fn list_for_thought(&self, thought_id: &ThoughtId) -> Result, DomainError> { + #[derive(sqlx::FromRow)] struct Row { id: i32, name: String } + sqlx::query_as::<_, Row>( + "SELECT t.id,t.name FROM tags t JOIN thought_tags tt ON tt.tag_id=t.id WHERE tt.thought_id=$1" + ).bind(thought_id.as_uuid()).fetch_all(&self.pool).await + .map_err(|e| DomainError::Internal(e.to_string())) + .map(|rows| rows.into_iter().map(|r| Tag { id: r.id, name: r.name }).collect()) + } + + async fn list_thoughts_by_tag(&self, tag_name: &str, page: &PageParams) -> Result, DomainError> { + let total: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM thought_tags tt JOIN tags t ON t.id=tt.tag_id WHERE t.name=$1" + ).bind(tag_name).fetch_one(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + + let rows = sqlx::query_as::<_, crate::thought::ThoughtRow>( + "SELECT th.id,th.user_id,th.content,th.in_reply_to_id,th.in_reply_to_url,th.ap_id,th.visibility,th.content_warning,th.sensitive,th.local,th.created_at,th.updated_at + FROM thoughts th JOIN thought_tags tt ON tt.thought_id=th.id JOIN tags t ON t.id=tt.tag_id + WHERE t.name=$1 ORDER BY th.created_at DESC LIMIT $2 OFFSET $3" + ).bind(tag_name).bind(page.limit()).bind(page.offset()) + .fetch_all(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(Paginated { items: rows.into_iter().map(Thought::from).collect(), total, page: page.page, per_page: page.per_page }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use domain::{models::{thought::{Thought, Visibility}, user::User}, value_objects::*}; + use crate::{thought::PgThoughtRepository, user::PgUserRepository}; + use domain::ports::{ThoughtRepository, UserRepository}; + + #[sqlx::test(migrations = "./migrations")] + async fn find_or_create_tag(pool: sqlx::PgPool) { + let repo = PgTagRepository::new(pool); + let t1 = repo.find_or_create("rust").await.unwrap(); + let t2 = repo.find_or_create("rust").await.unwrap(); + assert_eq!(t1.id, t2.id); + assert_eq!(t1.name, "rust"); + } + + #[sqlx::test(migrations = "./migrations")] + async fn attach_and_list(pool: sqlx::PgPool) { + let urepo = PgUserRepository::new(pool.clone()); + let trepo = PgThoughtRepository::new(pool.clone()); + let u = User::new_local(UserId::new(), Username::new("alice").unwrap(), Email::new("alice@ex.com").unwrap(), PasswordHash("h".into())); + urepo.save(&u).await.unwrap(); + let t = Thought::new_local(ThoughtId::new(), u.id.clone(), Content::new_local("hi").unwrap(), None, Visibility::Public, None, false); + trepo.save(&t).await.unwrap(); + let repo = PgTagRepository::new(pool); + let tag = repo.find_or_create("greetings").await.unwrap(); + repo.attach_to_thought(&t.id, tag.id).await.unwrap(); + let tags = repo.list_for_thought(&t.id).await.unwrap(); + assert_eq!(tags.len(), 1); + assert_eq!(tags[0].name, "greetings"); + } +} diff --git a/crates/adapters/postgres/src/top_friend.rs b/crates/adapters/postgres/src/top_friend.rs index ee5896f..1e691a4 100644 --- a/crates/adapters/postgres/src/top_friend.rs +++ b/crates/adapters/postgres/src/top_friend.rs @@ -1,2 +1,95 @@ -pub struct PgTopFriendRepository { _pool: sqlx::PgPool } -impl PgTopFriendRepository { pub fn new(pool: sqlx::PgPool) -> Self { Self { _pool: pool } } } +use async_trait::async_trait; +use sqlx::PgPool; +use domain::{errors::DomainError, models::{top_friend::TopFriend, user::User}, ports::TopFriendRepository, value_objects::UserId}; + +pub struct PgTopFriendRepository { pool: PgPool } +impl PgTopFriendRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } + +#[async_trait] +impl TopFriendRepository for PgTopFriendRepository { + async fn set_top_friends(&self, user_id: &UserId, friends: Vec<(UserId, i16)>) -> Result<(), DomainError> { + let mut tx = self.pool.begin().await.map_err(|e| DomainError::Internal(e.to_string()))?; + sqlx::query("DELETE FROM top_friends WHERE user_id=$1") + .bind(user_id.as_uuid()).execute(&mut *tx).await.map_err(|e| DomainError::Internal(e.to_string()))?; + for (friend_id, pos) in friends { + sqlx::query("INSERT INTO top_friends(user_id,friend_id,position) VALUES($1,$2,$3)") + .bind(user_id.as_uuid()).bind(friend_id.as_uuid()).bind(pos) + .execute(&mut *tx).await.map_err(|e| DomainError::Internal(e.to_string()))?; + } + tx.commit().await.map_err(|e| DomainError::Internal(e.to_string())) + } + + async fn list_for_user(&self, user_id: &UserId) -> Result, DomainError> { + #[derive(sqlx::FromRow)] + struct Row { + tf_user_id: uuid::Uuid, friend_id: uuid::Uuid, position: i16, + id: uuid::Uuid, username: String, email: String, password_hash: String, + display_name: Option, bio: Option, avatar_url: Option, + header_url: Option, custom_css: Option, local: bool, + ap_id: Option, inbox_url: Option, public_key: Option, + private_key: Option, + created_at: chrono::DateTime, updated_at: chrono::DateTime, + } + let rows = sqlx::query_as::<_, Row>( + "SELECT tf.user_id AS tf_user_id, tf.friend_id, tf.position, + u.id, u.username, u.email, u.password_hash, u.display_name, u.bio, + u.avatar_url, u.header_url, u.custom_css, u.local, u.ap_id, u.inbox_url, + u.public_key, u.private_key, u.created_at, u.updated_at + FROM top_friends tf JOIN users u ON u.id=tf.friend_id + WHERE tf.user_id=$1 ORDER BY tf.position" + ).bind(user_id.as_uuid()).fetch_all(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(|r| { + use domain::value_objects::{Email, PasswordHash, Username}; + let tf = TopFriend { user_id: UserId::from_uuid(r.tf_user_id), friend_id: UserId::from_uuid(r.friend_id), position: r.position }; + let u = User { + id: UserId::from_uuid(r.id), username: Username::from_trusted(r.username), + email: Email::from_trusted(r.email), password_hash: PasswordHash(r.password_hash), + display_name: r.display_name, bio: r.bio, avatar_url: r.avatar_url, + header_url: r.header_url, custom_css: r.custom_css, local: r.local, + ap_id: r.ap_id, inbox_url: r.inbox_url, public_key: r.public_key, + private_key: r.private_key, created_at: r.created_at, updated_at: r.updated_at, + }; + (tf, u) + }).collect()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use domain::{models::user::User, value_objects::*}; + use crate::user::PgUserRepository; + use domain::ports::UserRepository; + + async fn seed_user(pool: &sqlx::PgPool, username: &str, email: &str) -> User { + let repo = PgUserRepository::new(pool.clone()); + let u = User::new_local(UserId::new(), Username::new(username).unwrap(), Email::new(email).unwrap(), PasswordHash("h".into())); + repo.save(&u).await.unwrap(); u + } + + #[sqlx::test(migrations = "./migrations")] + async fn set_and_list_top_friends(pool: sqlx::PgPool) { + let alice = seed_user(&pool, "alice", "alice@ex.com").await; + let bob = seed_user(&pool, "bob", "bob@ex.com").await; + let repo = PgTopFriendRepository::new(pool); + repo.set_top_friends(&alice.id, vec![(bob.id.clone(), 1)]).await.unwrap(); + let friends = repo.list_for_user(&alice.id).await.unwrap(); + assert_eq!(friends.len(), 1); + assert_eq!(friends[0].0.position, 1); + assert_eq!(friends[0].1.username.as_str(), "bob"); + } + + #[sqlx::test(migrations = "./migrations")] + async fn replace_top_friends(pool: sqlx::PgPool) { + let alice = seed_user(&pool, "alice", "alice@ex.com").await; + let bob = seed_user(&pool, "bob", "bob@ex.com").await; + let carol = seed_user(&pool, "carol", "carol@ex.com").await; + let repo = PgTopFriendRepository::new(pool); + repo.set_top_friends(&alice.id, vec![(bob.id.clone(), 1)]).await.unwrap(); + repo.set_top_friends(&alice.id, vec![(carol.id.clone(), 1)]).await.unwrap(); + let friends = repo.list_for_user(&alice.id).await.unwrap(); + assert_eq!(friends.len(), 1); + assert_eq!(friends[0].1.username.as_str(), "carol"); + } +}