refactor(worker): thin handlers + factory — move all business logic to application services

This commit is contained in:
2026-05-14 13:57:14 +02:00
parent 057fc29abc
commit 904916d4c1
4 changed files with 117 additions and 298 deletions

View File

@@ -0,0 +1,80 @@
use std::sync::Arc;
use sqlx::PgPool;
use activitypub::ThoughtsObjectHandler;
use activitypub_base::ActivityPubService;
use application::services::{FederationEventService, NotificationEventService};
use postgres::activitypub::PgActivityPubRepository;
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
use crate::handlers::{FederationHandler, NotificationHandler};
pub struct WorkerHandlers {
pub notification: NotificationHandler,
pub federation: FederationHandler,
}
pub async fn build(
database_url: &str,
base_url: &str,
nats_url: &str,
) -> (
event_transport::EventConsumerAdapter<nats::NatsMessageSource>,
WorkerHandlers,
) {
let pool = PgPool::connect(database_url)
.await
.expect("DB connect failed");
// Repos
let thoughts = Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone()));
let users = Arc::new(postgres::user::PgUserRepository::new(pool.clone()));
let notifications = Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone()));
// ActivityPub service (for federation fan-out)
let ap_service: Arc<dyn domain::ports::OutboundFederationPort> = Arc::new(
ActivityPubService::new(
Arc::new(PostgresFederationRepository::new(pool.clone())),
Arc::new(PostgresApUserRepository::new(pool.clone(), base_url.to_string())),
Arc::new(ThoughtsObjectHandler::new(
Arc::new(PgActivityPubRepository::new(pool.clone())),
base_url,
)),
base_url.to_string(),
false,
"thoughts".to_string(),
false,
None,
)
.await
.expect("ActivityPubService build failed"),
);
// Application services
let notification_svc = Arc::new(NotificationEventService {
thoughts: thoughts.clone(),
notifications,
});
let federation_svc = Arc::new(FederationEventService {
thoughts,
users,
ap: ap_service,
base_url: base_url.to_string(),
});
// Thin handlers
let handlers = WorkerHandlers {
notification: NotificationHandler { service: notification_svc },
federation: FederationHandler { service: federation_svc },
};
// NATS consumer
let nats_client = async_nats::connect(nats_url)
.await
.expect("NATS connect failed");
let consumer = event_transport::EventConsumerAdapter::new(
nats::NatsMessageSource::new(nats_client),
);
(consumer, handlers)
}

View File

@@ -1,275 +1,23 @@
use std::sync::Arc;
use chrono::Utc;
use domain::{
errors::DomainError,
events::DomainEvent,
models::notification::{Notification, NotificationType},
ports::{NotificationRepository, ThoughtRepository},
value_objects::NotificationId,
};
use application::services::{FederationEventService, NotificationEventService};
use domain::{errors::DomainError, events::DomainEvent};
/// Handles domain events that should create notifications for users.
pub struct NotificationHandler {
pub thoughts: Arc<dyn ThoughtRepository>,
pub notifications: Arc<dyn NotificationRepository>,
pub service: Arc<NotificationEventService>,
}
impl NotificationHandler {
pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
match event {
DomainEvent::LikeAdded { like_id: _, user_id, thought_id } => {
let thought = match self.thoughts.find_by_id(thought_id).await? {
Some(t) => t,
None => return Ok(()), // thought deleted — skip
};
if thought.user_id == *user_id { return Ok(()); } // no self-notifications
self.notifications.save(&Notification {
id: NotificationId::new(),
user_id: thought.user_id,
notification_type: NotificationType::Like,
from_user_id: Some(user_id.clone()),
thought_id: Some(thought_id.clone()),
read: false,
created_at: Utc::now(),
}).await
}
DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => {
let thought = match self.thoughts.find_by_id(thought_id).await? {
Some(t) => t,
None => return Ok(()),
};
if thought.user_id == *user_id { return Ok(()); }
self.notifications.save(&Notification {
id: NotificationId::new(),
user_id: thought.user_id,
notification_type: NotificationType::Boost,
from_user_id: Some(user_id.clone()),
thought_id: Some(thought_id.clone()),
read: false,
created_at: Utc::now(),
}).await
}
DomainEvent::FollowAccepted { follower_id, following_id } => {
// The person being followed (following_id) gets notified
self.notifications.save(&Notification {
id: NotificationId::new(),
user_id: following_id.clone(),
notification_type: NotificationType::Follow,
from_user_id: Some(follower_id.clone()),
thought_id: None,
read: false,
created_at: Utc::now(),
}).await
}
DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => {
let reply_to_id = match in_reply_to_id {
Some(id) => id,
None => return Ok(()), // not a reply
};
let original = match self.thoughts.find_by_id(reply_to_id).await? {
Some(t) => t,
None => return Ok(()), // original deleted
};
if original.user_id == *user_id { return Ok(()); } // no self-notifications
self.notifications.save(&Notification {
id: NotificationId::new(),
user_id: original.user_id,
notification_type: NotificationType::Reply,
from_user_id: Some(user_id.clone()),
thought_id: Some(thought_id.clone()),
read: false,
created_at: Utc::now(),
}).await
}
// All other events: no notification needed in Plan 3
_ => Ok(()),
}
self.service.process(event).await
}
}
/// Stub handler for ActivityPub federation — implemented in Plan 4.
pub struct FederationHandler;
pub struct FederationHandler {
pub service: Arc<FederationEventService>,
}
impl FederationHandler {
pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
tracing::debug!(?event, "federation handler (stub — Plan 4)");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use domain::{
models::{thought::{Thought, Visibility}, user::User},
testing::TestStore,
value_objects::*,
};
use std::sync::Arc;
fn alice() -> User {
User::new_local(
UserId::new(),
Username::new("alice").unwrap(),
Email::new("alice@ex.com").unwrap(),
PasswordHash("h".into()),
)
}
#[tokio::test]
async fn like_added_creates_notification_for_thought_author() {
let store = TestStore::default();
let alice = alice();
let bob_id = UserId::new();
let thought = Thought::new_local(
ThoughtId::new(), alice.id.clone(),
Content::new_local("hello").unwrap(),
None, Visibility::Public, None, false,
);
store.users.lock().unwrap().push(alice.clone());
store.thoughts.lock().unwrap().push(thought.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
handler.handle(&DomainEvent::LikeAdded {
like_id: LikeId::new(),
user_id: bob_id.clone(),
thought_id: thought.id.clone(),
}).await.unwrap();
let notifs = store.notifications.lock().unwrap();
assert_eq!(notifs.len(), 1);
assert_eq!(notifs[0].user_id, alice.id);
assert!(matches!(notifs[0].notification_type, NotificationType::Like));
}
#[tokio::test]
async fn self_like_does_not_create_notification() {
let store = TestStore::default();
let alice = alice();
let thought = Thought::new_local(
ThoughtId::new(), alice.id.clone(),
Content::new_local("hello").unwrap(),
None, Visibility::Public, None, false,
);
store.users.lock().unwrap().push(alice.clone());
store.thoughts.lock().unwrap().push(thought.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
handler.handle(&DomainEvent::LikeAdded {
like_id: LikeId::new(),
user_id: alice.id.clone(),
thought_id: thought.id.clone(),
}).await.unwrap();
assert!(store.notifications.lock().unwrap().is_empty());
}
#[tokio::test]
async fn follow_accepted_creates_notification() {
let store = TestStore::default();
let alice = alice();
let bob_id = UserId::new();
store.users.lock().unwrap().push(alice.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
handler.handle(&DomainEvent::FollowAccepted {
follower_id: bob_id.clone(),
following_id: alice.id.clone(),
}).await.unwrap();
let notifs = store.notifications.lock().unwrap();
assert_eq!(notifs.len(), 1);
assert_eq!(notifs[0].user_id, alice.id);
assert!(matches!(notifs[0].notification_type, NotificationType::Follow));
}
#[tokio::test]
async fn reply_creates_notification_for_original_author() {
let store = TestStore::default();
let alice = alice();
let bob_id = UserId::new();
let original = Thought::new_local(
ThoughtId::new(), alice.id.clone(),
Content::new_local("original thought").unwrap(),
None, Visibility::Public, None, false,
);
store.users.lock().unwrap().push(alice.clone());
store.thoughts.lock().unwrap().push(original.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
handler.handle(&DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: bob_id.clone(),
in_reply_to_id: Some(original.id.clone()),
}).await.unwrap();
let notifs = store.notifications.lock().unwrap();
assert_eq!(notifs.len(), 1);
assert_eq!(notifs[0].user_id, alice.id);
assert!(matches!(notifs[0].notification_type, NotificationType::Reply));
}
#[tokio::test]
async fn self_reply_does_not_create_notification() {
let store = TestStore::default();
let alice = alice();
let original = Thought::new_local(
ThoughtId::new(), alice.id.clone(),
Content::new_local("original").unwrap(),
None, Visibility::Public, None, false,
);
store.users.lock().unwrap().push(alice.clone());
store.thoughts.lock().unwrap().push(original.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
handler.handle(&DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: alice.id.clone(),
in_reply_to_id: Some(original.id.clone()),
}).await.unwrap();
assert!(store.notifications.lock().unwrap().is_empty());
}
#[tokio::test]
async fn thought_without_reply_to_creates_no_notification() {
let store = TestStore::default();
let alice = alice();
store.users.lock().unwrap().push(alice.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
handler.handle(&DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: alice.id.clone(),
in_reply_to_id: None,
}).await.unwrap();
assert!(store.notifications.lock().unwrap().is_empty());
self.service.process(event).await
}
}

View File

@@ -1,8 +1,7 @@
mod factory;
mod handlers;
use std::sync::Arc;
use futures::StreamExt;
use sqlx::PgPool;
use domain::ports::EventConsumer;
#[tokio::main]
@@ -14,22 +13,12 @@ async fn main() {
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required");
let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());
let base_url = std::env::var("BASE_URL").expect("BASE_URL required");
tracing::info!("Connecting to postgres...");
let pool = PgPool::connect(&database_url).await.expect("DB connect failed");
tracing::info!("Connecting to NATS at {nats_url}...");
let nats_client = async_nats::connect(&nats_url).await.expect("NATS connect failed");
let consumer = event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(nats_client));
let notification_handler = handlers::NotificationHandler {
thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())),
notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())),
};
let federation_handler = handlers::FederationHandler;
tracing::info!("Building worker...");
let (consumer, handlers) = factory::build(&database_url, &base_url, &nats_url).await;
tracing::info!("Worker started, consuming events...");
let mut stream = consumer.consume();
while let Some(result) = stream.next().await {
match result {
@@ -37,20 +26,18 @@ async fn main() {
let event = &envelope.event;
tracing::debug!(?event, "received event");
let n_result = notification_handler.handle(event).await;
let f_result = federation_handler.handle(event).await;
let n = handlers.notification.handle(event).await;
let f = handlers.federation.handle(event).await;
if n_result.is_ok() && f_result.is_ok() {
if n.is_ok() && f.is_ok() {
(envelope.ack)();
} else {
if let Err(e) = n_result { tracing::error!("notification handler error: {e}"); }
if let Err(e) = f_result { tracing::error!("federation handler error: {e}"); }
if let Err(e) = n { tracing::error!("notification handler: {e}"); }
if let Err(e) = f { tracing::error!("federation handler: {e}"); }
(envelope.nack)();
}
}
Err(e) => {
tracing::error!("consumer error: {e}");
}
Err(e) => tracing::error!("consumer error: {e}"),
}
}
}