diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 4901ae8..bb5c47c 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -2,3 +2,26 @@ name = "worker" version = "0.1.0" edition = "2021" + +[[bin]] +name = "thoughts-worker" +path = "src/main.rs" + +[dependencies] +domain = { workspace = true } +nats = { workspace = true } +event-payload = { workspace = true } +postgres = { workspace = true } +async-nats = { workspace = true } +tokio = { workspace = true, features = ["full"] } +futures = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +dotenvy = { workspace = true } +serde_json = { workspace = true } +chrono = { workspace = true } +sqlx = { workspace = true } + +[dev-dependencies] +domain = { workspace = true, features = ["test-helpers"] } diff --git a/crates/worker/src/handlers.rs b/crates/worker/src/handlers.rs new file mode 100644 index 0000000..af61628 --- /dev/null +++ b/crates/worker/src/handlers.rs @@ -0,0 +1,178 @@ +use std::sync::Arc; +use chrono::Utc; +use domain::{ + errors::DomainError, + events::DomainEvent, + models::notification::{Notification, NotificationType}, + ports::{NotificationRepository, ThoughtRepository}, + value_objects::NotificationId, +}; + +/// Handles domain events that should create notifications for users. +pub struct NotificationHandler { + pub thoughts: Arc, + pub notifications: Arc, +} + +impl NotificationHandler { + pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + match event { + DomainEvent::LikeAdded { like_id: _, user_id, thought_id } => { + let thought = match self.thoughts.find_by_id(thought_id).await? { + Some(t) => t, + None => return Ok(()), // thought deleted — skip + }; + if thought.user_id == *user_id { return Ok(()); } // no self-notifications + self.notifications.save(&Notification { + id: NotificationId::new(), + user_id: thought.user_id, + notification_type: NotificationType::Like, + from_user_id: Some(user_id.clone()), + thought_id: Some(thought_id.clone()), + read: false, + created_at: Utc::now(), + }).await + } + DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => { + let thought = match self.thoughts.find_by_id(thought_id).await? { + Some(t) => t, + None => return Ok(()), + }; + if thought.user_id == *user_id { return Ok(()); } + self.notifications.save(&Notification { + id: NotificationId::new(), + user_id: thought.user_id, + notification_type: NotificationType::Boost, + from_user_id: Some(user_id.clone()), + thought_id: Some(thought_id.clone()), + read: false, + created_at: Utc::now(), + }).await + } + DomainEvent::FollowAccepted { follower_id, following_id } => { + // The person being followed (following_id) gets notified + self.notifications.save(&Notification { + id: NotificationId::new(), + user_id: following_id.clone(), + notification_type: NotificationType::Follow, + from_user_id: Some(follower_id.clone()), + thought_id: None, + read: false, + created_at: Utc::now(), + }).await + } + // All other events: no notification needed in Plan 3 + _ => Ok(()), + } + } +} + +/// Stub handler for ActivityPub federation — implemented in Plan 4. +pub struct FederationHandler; + +impl FederationHandler { + pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + tracing::debug!(?event, "federation handler (stub — Plan 4)"); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use domain::{ + models::{thought::{Thought, Visibility}, user::User}, + testing::TestStore, + value_objects::*, + }; + use std::sync::Arc; + + fn alice() -> User { + User::new_local( + UserId::new(), + Username::new("alice").unwrap(), + Email::new("alice@ex.com").unwrap(), + PasswordHash("h".into()), + ) + } + + #[tokio::test] + async fn like_added_creates_notification_for_thought_author() { + let store = TestStore::default(); + let alice = alice(); + let bob_id = UserId::new(); + + let thought = Thought::new_local( + ThoughtId::new(), alice.id.clone(), + Content::new_local("hello").unwrap(), + None, Visibility::Public, None, false, + ); + store.users.lock().unwrap().push(alice.clone()); + store.thoughts.lock().unwrap().push(thought.clone()); + + let handler = NotificationHandler { + thoughts: Arc::new(store.clone()), + notifications: Arc::new(store.clone()), + }; + + handler.handle(&DomainEvent::LikeAdded { + like_id: LikeId::new(), + user_id: bob_id.clone(), + thought_id: thought.id.clone(), + }).await.unwrap(); + + let notifs = store.notifications.lock().unwrap(); + assert_eq!(notifs.len(), 1); + assert_eq!(notifs[0].user_id, alice.id); + assert!(matches!(notifs[0].notification_type, NotificationType::Like)); + } + + #[tokio::test] + async fn self_like_does_not_create_notification() { + let store = TestStore::default(); + let alice = alice(); + let thought = Thought::new_local( + ThoughtId::new(), alice.id.clone(), + Content::new_local("hello").unwrap(), + None, Visibility::Public, None, false, + ); + store.users.lock().unwrap().push(alice.clone()); + store.thoughts.lock().unwrap().push(thought.clone()); + + let handler = NotificationHandler { + thoughts: Arc::new(store.clone()), + notifications: Arc::new(store.clone()), + }; + + handler.handle(&DomainEvent::LikeAdded { + like_id: LikeId::new(), + user_id: alice.id.clone(), + thought_id: thought.id.clone(), + }).await.unwrap(); + + assert!(store.notifications.lock().unwrap().is_empty()); + } + + #[tokio::test] + async fn follow_accepted_creates_notification() { + let store = TestStore::default(); + let alice = alice(); + let bob_id = UserId::new(); + store.users.lock().unwrap().push(alice.clone()); + + let handler = NotificationHandler { + thoughts: Arc::new(store.clone()), + notifications: Arc::new(store.clone()), + }; + + handler.handle(&DomainEvent::FollowAccepted { + follower_id: bob_id.clone(), + following_id: alice.id.clone(), + }).await.unwrap(); + + let notifs = store.notifications.lock().unwrap(); + assert_eq!(notifs.len(), 1); + assert_eq!(notifs[0].user_id, alice.id); + assert!(matches!(notifs[0].notification_type, NotificationType::Follow)); + } +}