From 904916d4c179ec317a5bbf8d1d7f6064cbd57aea Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 13:57:14 +0200 Subject: [PATCH] =?UTF-8?q?refactor(worker):=20thin=20handlers=20+=20facto?= =?UTF-8?q?ry=20=E2=80=94=20move=20all=20business=20logic=20to=20applicati?= =?UTF-8?q?on=20services?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/worker/Cargo.toml | 34 +++-- crates/worker/src/factory.rs | 80 ++++++++++ crates/worker/src/handlers.rs | 268 +--------------------------------- crates/worker/src/main.rs | 33 ++--- 4 files changed, 117 insertions(+), 298 deletions(-) create mode 100644 crates/worker/src/factory.rs diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index c63425c..a1e30b9 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -8,21 +8,25 @@ name = "thoughts-worker" path = "src/main.rs" [dependencies] -domain = { workspace = true } -nats = { workspace = true } -event-payload = { workspace = true } -event-transport = { workspace = true } -postgres = { workspace = true } -async-nats = { workspace = true } -tokio = { workspace = true, features = ["full"] } -futures = { workspace = true } -async-trait = { workspace = true } -tracing = { workspace = true } -tracing-subscriber = { workspace = true } -dotenvy = { workspace = true } -serde_json = { workspace = true } -chrono = { workspace = true } -sqlx = { workspace = true } +domain = { workspace = true } +application = { workspace = true } +nats = { workspace = true } +event-payload = { workspace = true } +event-transport = { workspace = true } +activitypub-base = { workspace = true } +activitypub = { workspace = true } +postgres = { workspace = true } +postgres-federation = { workspace = true } +async-nats = { workspace = true } +tokio = { workspace = true, features = ["full"] } +futures = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +dotenvy = { workspace = true } +serde_json = { workspace = true } +chrono = { workspace = true } +sqlx = { workspace = true } [dev-dependencies] domain = { workspace = true, features = ["test-helpers"] } diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs new file mode 100644 index 0000000..8c465de --- /dev/null +++ b/crates/worker/src/factory.rs @@ -0,0 +1,80 @@ +use std::sync::Arc; +use sqlx::PgPool; + +use activitypub::ThoughtsObjectHandler; +use activitypub_base::ActivityPubService; +use application::services::{FederationEventService, NotificationEventService}; +use postgres::activitypub::PgActivityPubRepository; +use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; + +use crate::handlers::{FederationHandler, NotificationHandler}; + +pub struct WorkerHandlers { + pub notification: NotificationHandler, + pub federation: FederationHandler, +} + +pub async fn build( + database_url: &str, + base_url: &str, + nats_url: &str, +) -> ( + event_transport::EventConsumerAdapter, + WorkerHandlers, +) { + let pool = PgPool::connect(database_url) + .await + .expect("DB connect failed"); + + // Repos + let thoughts = Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())); + let users = Arc::new(postgres::user::PgUserRepository::new(pool.clone())); + let notifications = Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())); + + // ActivityPub service (for federation fan-out) + let ap_service: Arc = Arc::new( + ActivityPubService::new( + Arc::new(PostgresFederationRepository::new(pool.clone())), + Arc::new(PostgresApUserRepository::new(pool.clone(), base_url.to_string())), + Arc::new(ThoughtsObjectHandler::new( + Arc::new(PgActivityPubRepository::new(pool.clone())), + base_url, + )), + base_url.to_string(), + false, + "thoughts".to_string(), + false, + None, + ) + .await + .expect("ActivityPubService build failed"), + ); + + // Application services + let notification_svc = Arc::new(NotificationEventService { + thoughts: thoughts.clone(), + notifications, + }); + let federation_svc = Arc::new(FederationEventService { + thoughts, + users, + ap: ap_service, + base_url: base_url.to_string(), + }); + + // Thin handlers + let handlers = WorkerHandlers { + notification: NotificationHandler { service: notification_svc }, + federation: FederationHandler { service: federation_svc }, + }; + + // NATS consumer + let nats_client = async_nats::connect(nats_url) + .await + .expect("NATS connect failed"); + let consumer = event_transport::EventConsumerAdapter::new( + nats::NatsMessageSource::new(nats_client), + ); + + (consumer, handlers) +} diff --git a/crates/worker/src/handlers.rs b/crates/worker/src/handlers.rs index 37e04af..cb64d8d 100644 --- a/crates/worker/src/handlers.rs +++ b/crates/worker/src/handlers.rs @@ -1,275 +1,23 @@ use std::sync::Arc; -use chrono::Utc; -use domain::{ - errors::DomainError, - events::DomainEvent, - models::notification::{Notification, NotificationType}, - ports::{NotificationRepository, ThoughtRepository}, - value_objects::NotificationId, -}; +use application::services::{FederationEventService, NotificationEventService}; +use domain::{errors::DomainError, events::DomainEvent}; -/// Handles domain events that should create notifications for users. pub struct NotificationHandler { - pub thoughts: Arc, - pub notifications: Arc, + pub service: Arc, } impl NotificationHandler { pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { - match event { - DomainEvent::LikeAdded { like_id: _, user_id, thought_id } => { - let thought = match self.thoughts.find_by_id(thought_id).await? { - Some(t) => t, - None => return Ok(()), // thought deleted — skip - }; - if thought.user_id == *user_id { return Ok(()); } // no self-notifications - self.notifications.save(&Notification { - id: NotificationId::new(), - user_id: thought.user_id, - notification_type: NotificationType::Like, - from_user_id: Some(user_id.clone()), - thought_id: Some(thought_id.clone()), - read: false, - created_at: Utc::now(), - }).await - } - DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => { - let thought = match self.thoughts.find_by_id(thought_id).await? { - Some(t) => t, - None => return Ok(()), - }; - if thought.user_id == *user_id { return Ok(()); } - self.notifications.save(&Notification { - id: NotificationId::new(), - user_id: thought.user_id, - notification_type: NotificationType::Boost, - from_user_id: Some(user_id.clone()), - thought_id: Some(thought_id.clone()), - read: false, - created_at: Utc::now(), - }).await - } - DomainEvent::FollowAccepted { follower_id, following_id } => { - // The person being followed (following_id) gets notified - self.notifications.save(&Notification { - id: NotificationId::new(), - user_id: following_id.clone(), - notification_type: NotificationType::Follow, - from_user_id: Some(follower_id.clone()), - thought_id: None, - read: false, - created_at: Utc::now(), - }).await - } - DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => { - let reply_to_id = match in_reply_to_id { - Some(id) => id, - None => return Ok(()), // not a reply - }; - let original = match self.thoughts.find_by_id(reply_to_id).await? { - Some(t) => t, - None => return Ok(()), // original deleted - }; - if original.user_id == *user_id { return Ok(()); } // no self-notifications - self.notifications.save(&Notification { - id: NotificationId::new(), - user_id: original.user_id, - notification_type: NotificationType::Reply, - from_user_id: Some(user_id.clone()), - thought_id: Some(thought_id.clone()), - read: false, - created_at: Utc::now(), - }).await - } - // All other events: no notification needed in Plan 3 - _ => Ok(()), - } + self.service.process(event).await } } -/// Stub handler for ActivityPub federation — implemented in Plan 4. -pub struct FederationHandler; +pub struct FederationHandler { + pub service: Arc, +} impl FederationHandler { pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { - tracing::debug!(?event, "federation handler (stub — Plan 4)"); - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use domain::{ - models::{thought::{Thought, Visibility}, user::User}, - testing::TestStore, - value_objects::*, - }; - use std::sync::Arc; - - fn alice() -> User { - User::new_local( - UserId::new(), - Username::new("alice").unwrap(), - Email::new("alice@ex.com").unwrap(), - PasswordHash("h".into()), - ) - } - - #[tokio::test] - async fn like_added_creates_notification_for_thought_author() { - let store = TestStore::default(); - let alice = alice(); - let bob_id = UserId::new(); - - let thought = Thought::new_local( - ThoughtId::new(), alice.id.clone(), - Content::new_local("hello").unwrap(), - None, Visibility::Public, None, false, - ); - store.users.lock().unwrap().push(alice.clone()); - store.thoughts.lock().unwrap().push(thought.clone()); - - let handler = NotificationHandler { - thoughts: Arc::new(store.clone()), - notifications: Arc::new(store.clone()), - }; - - handler.handle(&DomainEvent::LikeAdded { - like_id: LikeId::new(), - user_id: bob_id.clone(), - thought_id: thought.id.clone(), - }).await.unwrap(); - - let notifs = store.notifications.lock().unwrap(); - assert_eq!(notifs.len(), 1); - assert_eq!(notifs[0].user_id, alice.id); - assert!(matches!(notifs[0].notification_type, NotificationType::Like)); - } - - #[tokio::test] - async fn self_like_does_not_create_notification() { - let store = TestStore::default(); - let alice = alice(); - let thought = Thought::new_local( - ThoughtId::new(), alice.id.clone(), - Content::new_local("hello").unwrap(), - None, Visibility::Public, None, false, - ); - store.users.lock().unwrap().push(alice.clone()); - store.thoughts.lock().unwrap().push(thought.clone()); - - let handler = NotificationHandler { - thoughts: Arc::new(store.clone()), - notifications: Arc::new(store.clone()), - }; - - handler.handle(&DomainEvent::LikeAdded { - like_id: LikeId::new(), - user_id: alice.id.clone(), - thought_id: thought.id.clone(), - }).await.unwrap(); - - assert!(store.notifications.lock().unwrap().is_empty()); - } - - #[tokio::test] - async fn follow_accepted_creates_notification() { - let store = TestStore::default(); - let alice = alice(); - let bob_id = UserId::new(); - store.users.lock().unwrap().push(alice.clone()); - - let handler = NotificationHandler { - thoughts: Arc::new(store.clone()), - notifications: Arc::new(store.clone()), - }; - - handler.handle(&DomainEvent::FollowAccepted { - follower_id: bob_id.clone(), - following_id: alice.id.clone(), - }).await.unwrap(); - - let notifs = store.notifications.lock().unwrap(); - assert_eq!(notifs.len(), 1); - assert_eq!(notifs[0].user_id, alice.id); - assert!(matches!(notifs[0].notification_type, NotificationType::Follow)); - } - - #[tokio::test] - async fn reply_creates_notification_for_original_author() { - let store = TestStore::default(); - let alice = alice(); - let bob_id = UserId::new(); - - let original = Thought::new_local( - ThoughtId::new(), alice.id.clone(), - Content::new_local("original thought").unwrap(), - None, Visibility::Public, None, false, - ); - store.users.lock().unwrap().push(alice.clone()); - store.thoughts.lock().unwrap().push(original.clone()); - - let handler = NotificationHandler { - thoughts: Arc::new(store.clone()), - notifications: Arc::new(store.clone()), - }; - - handler.handle(&DomainEvent::ThoughtCreated { - thought_id: ThoughtId::new(), - user_id: bob_id.clone(), - in_reply_to_id: Some(original.id.clone()), - }).await.unwrap(); - - let notifs = store.notifications.lock().unwrap(); - assert_eq!(notifs.len(), 1); - assert_eq!(notifs[0].user_id, alice.id); - assert!(matches!(notifs[0].notification_type, NotificationType::Reply)); - } - - #[tokio::test] - async fn self_reply_does_not_create_notification() { - let store = TestStore::default(); - let alice = alice(); - let original = Thought::new_local( - ThoughtId::new(), alice.id.clone(), - Content::new_local("original").unwrap(), - None, Visibility::Public, None, false, - ); - store.users.lock().unwrap().push(alice.clone()); - store.thoughts.lock().unwrap().push(original.clone()); - - let handler = NotificationHandler { - thoughts: Arc::new(store.clone()), - notifications: Arc::new(store.clone()), - }; - - handler.handle(&DomainEvent::ThoughtCreated { - thought_id: ThoughtId::new(), - user_id: alice.id.clone(), - in_reply_to_id: Some(original.id.clone()), - }).await.unwrap(); - - assert!(store.notifications.lock().unwrap().is_empty()); - } - - #[tokio::test] - async fn thought_without_reply_to_creates_no_notification() { - let store = TestStore::default(); - let alice = alice(); - store.users.lock().unwrap().push(alice.clone()); - - let handler = NotificationHandler { - thoughts: Arc::new(store.clone()), - notifications: Arc::new(store.clone()), - }; - - handler.handle(&DomainEvent::ThoughtCreated { - thought_id: ThoughtId::new(), - user_id: alice.id.clone(), - in_reply_to_id: None, - }).await.unwrap(); - - assert!(store.notifications.lock().unwrap().is_empty()); + self.service.process(event).await } } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index f2827ab..5ed2abc 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -1,8 +1,7 @@ +mod factory; mod handlers; -use std::sync::Arc; use futures::StreamExt; -use sqlx::PgPool; use domain::ports::EventConsumer; #[tokio::main] @@ -14,22 +13,12 @@ async fn main() { let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required"); let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into()); + let base_url = std::env::var("BASE_URL").expect("BASE_URL required"); - tracing::info!("Connecting to postgres..."); - let pool = PgPool::connect(&database_url).await.expect("DB connect failed"); - - tracing::info!("Connecting to NATS at {nats_url}..."); - let nats_client = async_nats::connect(&nats_url).await.expect("NATS connect failed"); - let consumer = event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(nats_client)); - - let notification_handler = handlers::NotificationHandler { - thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())), - notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())), - }; - let federation_handler = handlers::FederationHandler; + tracing::info!("Building worker..."); + let (consumer, handlers) = factory::build(&database_url, &base_url, &nats_url).await; tracing::info!("Worker started, consuming events..."); - let mut stream = consumer.consume(); while let Some(result) = stream.next().await { match result { @@ -37,20 +26,18 @@ async fn main() { let event = &envelope.event; tracing::debug!(?event, "received event"); - let n_result = notification_handler.handle(event).await; - let f_result = federation_handler.handle(event).await; + let n = handlers.notification.handle(event).await; + let f = handlers.federation.handle(event).await; - if n_result.is_ok() && f_result.is_ok() { + if n.is_ok() && f.is_ok() { (envelope.ack)(); } else { - if let Err(e) = n_result { tracing::error!("notification handler error: {e}"); } - if let Err(e) = f_result { tracing::error!("federation handler error: {e}"); } + if let Err(e) = n { tracing::error!("notification handler: {e}"); } + if let Err(e) = f { tracing::error!("federation handler: {e}"); } (envelope.nack)(); } } - Err(e) => { - tracing::error!("consumer error: {e}"); - } + Err(e) => tracing::error!("consumer error: {e}"), } } }