From 02de6b6f83a21a4b20c220f623f1cc487d47390e Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 09:45:57 +0200 Subject: [PATCH] docs: v2 Plan 3 events+worker implementation plan --- .../plans/2026-05-14-v2-plan3-events.md | 996 ++++++++++++++++++ 1 file changed, 996 insertions(+) create mode 100644 docs/superpowers/plans/2026-05-14-v2-plan3-events.md diff --git a/docs/superpowers/plans/2026-05-14-v2-plan3-events.md b/docs/superpowers/plans/2026-05-14-v2-plan3-events.md new file mode 100644 index 0000000..498cec4 --- /dev/null +++ b/docs/superpowers/plans/2026-05-14-v2-plan3-events.md @@ -0,0 +1,996 @@ +# Thoughts v2 — Plan 3: Events + Worker (NATS) + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Wire real async event processing — use cases publish domain events to NATS, a worker binary subscribes and runs handlers (NotificationHandler creates DB records; FederationHandler is stubbed for Plan 4). + +**Architecture:** `event-payload/` holds the serializable NATS wire types. `nats/` wraps `async-nats` and implements both `EventPublisher` (publish to NATS) and `EventConsumer` (subscribe, yield `EventEnvelope` stream). `worker/` is a standalone binary that consumes events and dispatches to handlers. `presentation/` swaps its `NoOpEventPublisher` for the real NATS publisher. `event-publisher/` stays a stub (future fan-out to multiple backends). + +**Tech Stack:** Rust, async-nats 0.38, serde_json, futures, async-stream, tokio + +**Prerequisites:** NATS server running locally. Start with: +```bash +docker run -d --name nats -p 4222:4222 nats:latest +# or add to docker-compose if preferred +``` + +--- + +## File Map + +``` +Modified: Cargo.toml ← add async-nats, async-stream to workspace.dependencies +Modified: crates/adapters/event-payload/Cargo.toml ← add deps +Modified: crates/adapters/event-payload/src/lib.rs ← EventPayload enum + subject() + From<&DomainEvent> +Modified: crates/adapters/nats/Cargo.toml ← add deps +Modified: crates/adapters/nats/src/lib.rs ← NatsEventPublisher + NatsEventConsumer +Modified: crates/worker/Cargo.toml ← add deps, add [[bin]] +Create: crates/worker/src/handlers.rs ← NotificationHandler, FederationHandler (stub) +Modified: crates/worker/src/main.rs ← consumer loop binary +Modified: crates/presentation/src/lib.rs ← swap NoOp for NatsEventPublisher +Modified: crates/presentation/Cargo.toml ← add nats dep +``` + +--- + +### Task 1: Workspace deps + event-payload crate + +**Files:** +- Modify: `Cargo.toml` (root workspace) +- Modify: `crates/adapters/event-payload/Cargo.toml` +- Modify: `crates/adapters/event-payload/src/lib.rs` + +- [ ] **Add to root `Cargo.toml` `[workspace.dependencies]`:** + +```toml +async-nats = "0.38" +async-stream = "0.3" + +event-payload = { path = "crates/adapters/event-payload" } +event-publisher = { path = "crates/adapters/event-publisher" } +nats = { path = "crates/adapters/nats" } +``` + +Check if `event-payload`, `event-publisher`, `nats` are already listed — they should be from Plan 1 scaffolding. If so, skip those lines and only add `async-nats` and `async-stream`. + +- [ ] **Write `crates/adapters/event-payload/Cargo.toml`:** + +```toml +[package] +name = "event-payload" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { workspace = true } +serde_json = { workspace = true } +``` + +- [ ] **Write `crates/adapters/event-payload/src/lib.rs`:** + +```rust +use serde::{Deserialize, Serialize}; + +/// Serializable mirror of domain::events::DomainEvent. +/// All IDs are Strings (UUID hex) — no domain type dependencies. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", content = "data")] +pub enum EventPayload { + ThoughtCreated { + thought_id: String, + user_id: String, + in_reply_to_id: Option, + }, + ThoughtDeleted { + thought_id: String, + user_id: String, + }, + ThoughtUpdated { + thought_id: String, + user_id: String, + }, + LikeAdded { + like_id: String, + user_id: String, + thought_id: String, + }, + LikeRemoved { + user_id: String, + thought_id: String, + }, + BoostAdded { + boost_id: String, + user_id: String, + thought_id: String, + }, + BoostRemoved { + user_id: String, + thought_id: String, + }, + FollowRequested { + follower_id: String, + following_id: String, + }, + FollowAccepted { + follower_id: String, + following_id: String, + }, + FollowRejected { + follower_id: String, + following_id: String, + }, + Unfollowed { + follower_id: String, + following_id: String, + }, + UserBlocked { + blocker_id: String, + blocked_id: String, + }, +} + +impl EventPayload { + /// Returns the NATS subject for this event. + pub fn subject(&self) -> &'static str { + match self { + Self::ThoughtCreated { .. } => "thoughts.created", + Self::ThoughtDeleted { .. } => "thoughts.deleted", + Self::ThoughtUpdated { .. } => "thoughts.updated", + Self::LikeAdded { .. } => "likes.added", + Self::LikeRemoved { .. } => "likes.removed", + Self::BoostAdded { .. } => "boosts.added", + Self::BoostRemoved { .. } => "boosts.removed", + Self::FollowRequested { .. } => "follows.requested", + Self::FollowAccepted { .. } => "follows.accepted", + Self::FollowRejected { .. } => "follows.rejected", + Self::Unfollowed { .. } => "follows.removed", + Self::UserBlocked { .. } => "users.blocked", + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn thought_created_roundtrip() { + let p = EventPayload::ThoughtCreated { + thought_id: "abc".into(), + user_id: "def".into(), + in_reply_to_id: None, + }; + let json = serde_json::to_string(&p).unwrap(); + let back: EventPayload = serde_json::from_str(&json).unwrap(); + assert_eq!(back.subject(), "thoughts.created"); + } + + #[test] + fn all_subjects_are_unique() { + let samples: &[EventPayload] = &[ + EventPayload::ThoughtCreated { thought_id: "a".into(), user_id: "b".into(), in_reply_to_id: None }, + EventPayload::ThoughtDeleted { thought_id: "a".into(), user_id: "b".into() }, + EventPayload::ThoughtUpdated { thought_id: "a".into(), user_id: "b".into() }, + EventPayload::LikeAdded { like_id: "a".into(), user_id: "b".into(), thought_id: "c".into() }, + EventPayload::LikeRemoved { user_id: "b".into(), thought_id: "c".into() }, + EventPayload::BoostAdded { boost_id: "a".into(), user_id: "b".into(), thought_id: "c".into() }, + EventPayload::BoostRemoved { user_id: "b".into(), thought_id: "c".into() }, + EventPayload::FollowRequested { follower_id: "a".into(), following_id: "b".into() }, + EventPayload::FollowAccepted { follower_id: "a".into(), following_id: "b".into() }, + EventPayload::FollowRejected { follower_id: "a".into(), following_id: "b".into() }, + EventPayload::Unfollowed { follower_id: "a".into(), following_id: "b".into() }, + EventPayload::UserBlocked { blocker_id: "a".into(), blocked_id: "b".into() }, + ]; + let mut subjects: Vec<&str> = samples.iter().map(|p| p.subject()).collect(); + subjects.sort(); + subjects.dedup(); + assert_eq!(subjects.len(), samples.len(), "each event must have a unique subject"); + } +} +``` + +- [ ] **Run:** `cargo test -p event-payload` + Expected: 2 tests pass. + +- [ ] **Commit:** +```bash +git add Cargo.toml crates/adapters/event-payload/ +git commit -m "feat(event-payload): serializable NATS event payload types" +``` + +--- + +### Task 2: nats crate — NatsEventPublisher + NatsEventConsumer + +**Files:** +- Modify: `crates/adapters/nats/Cargo.toml` +- Modify: `crates/adapters/nats/src/lib.rs` + +- [ ] **Write `crates/adapters/nats/Cargo.toml`:** + +```toml +[package] +name = "nats" +version = "0.1.0" +edition = "2021" + +[dependencies] +domain = { workspace = true } +event-payload = { workspace = true } +async-nats = { workspace = true } +async-stream = { workspace = true } +serde_json = { workspace = true } +futures = { workspace = true } +tokio = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } +``` + +- [ ] **Write test** at bottom of `crates/adapters/nats/src/lib.rs`: + +```rust +#[cfg(test)] +mod tests { + use super::*; + use domain::value_objects::{ThoughtId, UserId}; + + #[test] + fn payload_from_domain_event_has_correct_subject() { + let event = domain::events::DomainEvent::ThoughtCreated { + thought_id: ThoughtId::new(), + user_id: UserId::new(), + in_reply_to_id: None, + }; + let payload = EventPayload::from(&event); + assert_eq!(payload.subject(), "thoughts.created"); + } + + #[test] + fn domain_event_roundtrip_via_payload() { + let uid = UserId::new(); + let tid = ThoughtId::new(); + let event = domain::events::DomainEvent::LikeAdded { + like_id: domain::value_objects::LikeId::new(), + user_id: uid.clone(), + thought_id: tid.clone(), + }; + let payload = EventPayload::from(&event); + let back = domain::events::DomainEvent::try_from(payload).unwrap(); + if let domain::events::DomainEvent::LikeAdded { user_id, thought_id, .. } = back { + assert_eq!(user_id, uid); + assert_eq!(thought_id, tid); + } else { + panic!("wrong variant"); + } + } +} +``` + +- [ ] **Run:** `cargo test -p nats` — Expected: FAIL (lib.rs is empty). + +- [ ] **Write `crates/adapters/nats/src/lib.rs`:** + +```rust +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::{DomainEvent, EventEnvelope}, + ports::{EventConsumer, EventPublisher}, + value_objects::{BoostId, LikeId, ThoughtId, UserId}, +}; +use event_payload::EventPayload; +use futures::stream::BoxStream; + +// ── DomainEvent → EventPayload ───────────────────────────────────────────── + +impl From<&DomainEvent> for EventPayload { + fn from(e: &DomainEvent) -> Self { + match e { + DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => Self::ThoughtCreated { + thought_id: thought_id.to_string(), + user_id: user_id.to_string(), + in_reply_to_id: in_reply_to_id.as_ref().map(|x| x.to_string()), + }, + DomainEvent::ThoughtDeleted { thought_id, user_id } => Self::ThoughtDeleted { + thought_id: thought_id.to_string(), user_id: user_id.to_string(), + }, + DomainEvent::ThoughtUpdated { thought_id, user_id } => Self::ThoughtUpdated { + thought_id: thought_id.to_string(), user_id: user_id.to_string(), + }, + DomainEvent::LikeAdded { like_id, user_id, thought_id } => Self::LikeAdded { + like_id: like_id.to_string(), user_id: user_id.to_string(), thought_id: thought_id.to_string(), + }, + DomainEvent::LikeRemoved { user_id, thought_id } => Self::LikeRemoved { + user_id: user_id.to_string(), thought_id: thought_id.to_string(), + }, + DomainEvent::BoostAdded { boost_id, user_id, thought_id } => Self::BoostAdded { + boost_id: boost_id.to_string(), user_id: user_id.to_string(), thought_id: thought_id.to_string(), + }, + DomainEvent::BoostRemoved { user_id, thought_id } => Self::BoostRemoved { + user_id: user_id.to_string(), thought_id: thought_id.to_string(), + }, + DomainEvent::FollowRequested { follower_id, following_id } => Self::FollowRequested { + follower_id: follower_id.to_string(), following_id: following_id.to_string(), + }, + DomainEvent::FollowAccepted { follower_id, following_id } => Self::FollowAccepted { + follower_id: follower_id.to_string(), following_id: following_id.to_string(), + }, + DomainEvent::FollowRejected { follower_id, following_id } => Self::FollowRejected { + follower_id: follower_id.to_string(), following_id: following_id.to_string(), + }, + DomainEvent::Unfollowed { follower_id, following_id } => Self::Unfollowed { + follower_id: follower_id.to_string(), following_id: following_id.to_string(), + }, + DomainEvent::UserBlocked { blocker_id, blocked_id } => Self::UserBlocked { + blocker_id: blocker_id.to_string(), blocked_id: blocked_id.to_string(), + }, + } + } +} + +// ── EventPayload → DomainEvent ───────────────────────────────────────────── + +fn parse_uuid(s: &str, field: &str) -> Result { + uuid::Uuid::parse_str(s) + .map_err(|_| DomainError::Internal(format!("invalid uuid for {field}: {s}"))) +} + +impl TryFrom for DomainEvent { + type Error = DomainError; + + fn try_from(p: EventPayload) -> Result { + Ok(match p { + EventPayload::ThoughtCreated { thought_id, user_id, in_reply_to_id } => DomainEvent::ThoughtCreated { + thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + in_reply_to_id: in_reply_to_id + .map(|s| parse_uuid(&s, "in_reply_to_id").map(ThoughtId::from_uuid)) + .transpose()?, + }, + EventPayload::ThoughtDeleted { thought_id, user_id } => DomainEvent::ThoughtDeleted { + thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + }, + EventPayload::ThoughtUpdated { thought_id, user_id } => DomainEvent::ThoughtUpdated { + thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + }, + EventPayload::LikeAdded { like_id, user_id, thought_id } => DomainEvent::LikeAdded { + like_id: LikeId::from_uuid(parse_uuid(&like_id, "like_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), + }, + EventPayload::LikeRemoved { user_id, thought_id } => DomainEvent::LikeRemoved { + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), + }, + EventPayload::BoostAdded { boost_id, user_id, thought_id } => DomainEvent::BoostAdded { + boost_id: BoostId::from_uuid(parse_uuid(&boost_id, "boost_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), + }, + EventPayload::BoostRemoved { user_id, thought_id } => DomainEvent::BoostRemoved { + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), + }, + EventPayload::FollowRequested { follower_id, following_id } => DomainEvent::FollowRequested { + follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?), + following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?), + }, + EventPayload::FollowAccepted { follower_id, following_id } => DomainEvent::FollowAccepted { + follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?), + following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?), + }, + EventPayload::FollowRejected { follower_id, following_id } => DomainEvent::FollowRejected { + follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?), + following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?), + }, + EventPayload::Unfollowed { follower_id, following_id } => DomainEvent::Unfollowed { + follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?), + following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?), + }, + EventPayload::UserBlocked { blocker_id, blocked_id } => DomainEvent::UserBlocked { + blocker_id: UserId::from_uuid(parse_uuid(&blocker_id, "blocker_id")?), + blocked_id: UserId::from_uuid(parse_uuid(&blocked_id, "blocked_id")?), + }, + }) + } +} + +// ── NatsEventPublisher ──────────────────────────────────────────────────── + +pub struct NatsEventPublisher { + client: async_nats::Client, +} + +impl NatsEventPublisher { + pub fn new(client: async_nats::Client) -> Self { Self { client } } +} + +#[async_trait] +impl EventPublisher for NatsEventPublisher { + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { + let payload = EventPayload::from(event); + let subject = payload.subject(); + let bytes = serde_json::to_vec(&payload) + .map_err(|e| DomainError::Internal(e.to_string()))?; + self.client + .publish(subject, bytes.into()) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + } +} + +// ── NatsEventConsumer ───────────────────────────────────────────────────── + +pub struct NatsEventConsumer { + client: async_nats::Client, +} + +impl NatsEventConsumer { + pub fn new(client: async_nats::Client) -> Self { Self { client } } +} + +impl EventConsumer for NatsEventConsumer { + fn consume(&self) -> BoxStream<'_, Result> { + let client = self.client.clone(); + Box::pin(async_stream::try_stream! { + let mut sub = client + .subscribe(">") + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + use futures::StreamExt; + while let Some(msg) = sub.next().await { + let payload = match serde_json::from_slice::(&msg.payload) { + Ok(p) => p, + Err(e) => { + tracing::warn!("failed to deserialize event payload: {e}"); + continue; + } + }; + let event = match DomainEvent::try_from(payload) { + Ok(e) => e, + Err(e) => { + tracing::warn!("failed to convert payload to domain event: {e}"); + continue; + } + }; + // Basic NATS has no ack/nack — at-most-once delivery + yield EventEnvelope { + event, + ack: Box::new(|| {}), + nack: Box::new(|| {}), + }; + } + }) + } +} +``` + +- [ ] **Run:** `cargo test -p nats` + Expected: 2 tests pass. + +- [ ] **Commit:** +```bash +git add crates/adapters/nats/ +git commit -m "feat(nats): NatsEventPublisher and NatsEventConsumer with payload conversion" +``` + +--- + +### Task 3: worker — NotificationHandler + FederationHandler + +**Files:** +- Modify: `crates/worker/Cargo.toml` +- Create: `crates/worker/src/handlers.rs` + +- [ ] **Write `crates/worker/Cargo.toml`:** + +```toml +[package] +name = "worker" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "thoughts-worker" +path = "src/main.rs" + +[dependencies] +domain = { workspace = true } +nats = { workspace = true } +event-payload = { workspace = true } +postgres = { workspace = true } +async-nats = { workspace = true } +tokio = { workspace = true, features = ["full"] } +futures = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +dotenvy = { workspace = true } +serde_json = { workspace = true } +chrono = { workspace = true } +``` + +- [ ] **Write tests** at bottom of `crates/worker/src/handlers.rs`: + +```rust +#[cfg(test)] +mod tests { + use super::*; + use domain::{ + models::{thought::{Thought, Visibility}, user::User}, + testing::TestStore, + value_objects::*, + }; + use std::sync::Arc; + + fn alice() -> User { + User::new_local( + UserId::new(), + Username::new("alice").unwrap(), + Email::new("alice@ex.com").unwrap(), + PasswordHash("h".into()), + ) + } + + #[tokio::test] + async fn like_added_creates_notification_for_thought_author() { + let store = TestStore::default(); + let alice = alice(); + let bob_id = UserId::new(); + + // alice posts a thought + let thought = Thought::new_local( + ThoughtId::new(), alice.id.clone(), + Content::new_local("hello").unwrap(), + None, Visibility::Public, None, false, + ); + store.users.lock().unwrap().push(alice.clone()); + store.thoughts.lock().unwrap().push(thought.clone()); + + let handler = NotificationHandler { + thoughts: Arc::new(store.clone()), + notifications: Arc::new(store.clone()), + }; + + // bob likes alice's thought + handler.handle(&DomainEvent::LikeAdded { + like_id: LikeId::new(), + user_id: bob_id.clone(), + thought_id: thought.id.clone(), + }).await.unwrap(); + + let notifs = store.notifications.lock().unwrap(); + assert_eq!(notifs.len(), 1); + assert_eq!(notifs[0].user_id, alice.id); // notification goes to alice + assert!(matches!(notifs[0].notification_type, domain::models::notification::NotificationType::Like)); + } + + #[tokio::test] + async fn self_like_does_not_create_notification() { + let store = TestStore::default(); + let alice = alice(); + let thought = Thought::new_local( + ThoughtId::new(), alice.id.clone(), + Content::new_local("hello").unwrap(), + None, Visibility::Public, None, false, + ); + store.users.lock().unwrap().push(alice.clone()); + store.thoughts.lock().unwrap().push(thought.clone()); + + let handler = NotificationHandler { + thoughts: Arc::new(store.clone()), + notifications: Arc::new(store.clone()), + }; + + handler.handle(&DomainEvent::LikeAdded { + like_id: LikeId::new(), + user_id: alice.id.clone(), // alice likes her own thought + thought_id: thought.id.clone(), + }).await.unwrap(); + + assert!(store.notifications.lock().unwrap().is_empty()); + } + + #[tokio::test] + async fn follow_accepted_creates_notification() { + let store = TestStore::default(); + let alice = alice(); + let bob_id = UserId::new(); + store.users.lock().unwrap().push(alice.clone()); + + let handler = NotificationHandler { + thoughts: Arc::new(store.clone()), + notifications: Arc::new(store.clone()), + }; + + // bob follows alice (alice gets notified) + handler.handle(&DomainEvent::FollowAccepted { + follower_id: bob_id.clone(), + following_id: alice.id.clone(), + }).await.unwrap(); + + let notifs = store.notifications.lock().unwrap(); + assert_eq!(notifs.len(), 1); + assert_eq!(notifs[0].user_id, alice.id); + assert!(matches!(notifs[0].notification_type, domain::models::notification::NotificationType::Follow)); + } +} +``` + +- [ ] **Run:** `cargo test -p worker` — Expected: FAIL (handlers.rs doesn't exist yet). + +- [ ] **Create `crates/worker/src/handlers.rs`:** + +```rust +use std::sync::Arc; +use chrono::Utc; +use domain::{ + errors::DomainError, + events::DomainEvent, + models::notification::{Notification, NotificationType}, + ports::{NotificationRepository, ThoughtRepository}, + value_objects::NotificationId, +}; + +/// Handles domain events that should create notifications for users. +pub struct NotificationHandler { + pub thoughts: Arc, + pub notifications: Arc, +} + +impl NotificationHandler { + pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + match event { + DomainEvent::LikeAdded { like_id: _, user_id, thought_id } => { + let thought = match self.thoughts.find_by_id(thought_id).await? { + Some(t) => t, + None => return Ok(()), // thought deleted — skip + }; + if thought.user_id == *user_id { return Ok(()); } // no self-notifications + self.notifications.save(&Notification { + id: NotificationId::new(), + user_id: thought.user_id, + notification_type: NotificationType::Like, + from_user_id: Some(user_id.clone()), + thought_id: Some(thought_id.clone()), + read: false, + created_at: Utc::now(), + }).await + } + DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => { + let thought = match self.thoughts.find_by_id(thought_id).await? { + Some(t) => t, + None => return Ok(()), + }; + if thought.user_id == *user_id { return Ok(()); } + self.notifications.save(&Notification { + id: NotificationId::new(), + user_id: thought.user_id, + notification_type: NotificationType::Boost, + from_user_id: Some(user_id.clone()), + thought_id: Some(thought_id.clone()), + read: false, + created_at: Utc::now(), + }).await + } + DomainEvent::FollowAccepted { follower_id, following_id } => { + // The person being followed (following_id) gets notified + self.notifications.save(&Notification { + id: NotificationId::new(), + user_id: following_id.clone(), + notification_type: NotificationType::Follow, + from_user_id: Some(follower_id.clone()), + thought_id: None, + read: false, + created_at: Utc::now(), + }).await + } + // All other events: no notification needed in Plan 3 + _ => Ok(()), + } + } +} + +/// Stub handler for ActivityPub federation — implemented in Plan 4. +pub struct FederationHandler; + +impl FederationHandler { + pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + tracing::debug!(event = ?event, "federation handler (stub — Plan 4)"); + Ok(()) + } +} +``` + +- [ ] **Run:** `cargo test -p worker` — Expected: 3 tests pass. + +- [ ] **Commit:** +```bash +git add crates/worker/ +git commit -m "feat(worker): NotificationHandler and FederationHandler stub" +``` + +--- + +### Task 4: worker main binary + +**Files:** +- Modify: `crates/worker/src/main.rs` + +- [ ] **Write `crates/worker/src/main.rs`:** + +```rust +mod handlers; + +use std::sync::Arc; +use futures::StreamExt; +use sqlx::PgPool; +use domain::ports::EventConsumer; + +#[tokio::main] +async fn main() { + dotenvy::dotenv().ok(); + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required"); + let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into()); + + tracing::info!("Connecting to postgres..."); + let pool = PgPool::connect(&database_url).await.expect("DB connect failed"); + + tracing::info!("Connecting to NATS at {nats_url}..."); + let nats_client = async_nats::connect(&nats_url).await.expect("NATS connect failed"); + let consumer = nats::NatsEventConsumer::new(nats_client); + + let notification_handler = handlers::NotificationHandler { + thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())), + notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())), + }; + let federation_handler = handlers::FederationHandler; + + tracing::info!("Worker started, consuming events..."); + + let mut stream = consumer.consume(); + while let Some(result) = stream.next().await { + match result { + Ok(envelope) => { + let event = &envelope.event; + tracing::debug!(subject = ?event, "received event"); + + let n_result = notification_handler.handle(event).await; + let f_result = federation_handler.handle(event).await; + + if n_result.is_ok() && f_result.is_ok() { + (envelope.ack)(); + } else { + if let Err(e) = n_result { tracing::error!("notification handler error: {e}"); } + if let Err(e) = f_result { tracing::error!("federation handler error: {e}"); } + (envelope.nack)(); + } + } + Err(e) => { + tracing::error!("consumer error: {e}"); + } + } + } +} +``` + +- [ ] **Run:** `cargo build -p worker` + Expected: compiles cleanly (binary `thoughts-worker` produced). + +- [ ] **Smoke test** (requires NATS running): +```bash +# Terminal 1: start NATS if not already running +docker run -d --name nats -p 4222:4222 nats:latest || true + +# Terminal 2: start worker +DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres \ +RUST_LOG=info \ +cargo run --bin thoughts-worker & +sleep 2 + +# Terminal 3: start API server +DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres JWT_SECRET=dev cargo run -p presentation & +sleep 2 + +# Create a user, post a thought, like it — check that worker logs "received event" +TOKEN=$(curl -s -X POST http://localhost:3000/auth/register \ + -H 'content-type: application/json' \ + -d '{"username":"evttest","email":"evt@test.com","password":"pw"}' | jq -r .token) + +TID=$(curl -s -X POST http://localhost:3000/thoughts \ + -H 'content-type: application/json' \ + -H "Authorization: Bearer $TOKEN" \ + -d '{"content":"event test"}' | jq -r .id) + +curl -s -X POST http://localhost:3000/thoughts/$TID/like \ + -H "Authorization: Bearer $TOKEN" + +kill %1 %2 2>/dev/null +``` + +Expected: worker logs show `received event` for the like. No errors. + +- [ ] **Commit:** +```bash +git add crates/worker/src/main.rs +git commit -m "feat(worker): consumer loop binary connecting NATS to handlers" +``` + +--- + +### Task 5: Presentation — swap NoOp for real NatsEventPublisher + +**Files:** +- Modify: `crates/presentation/Cargo.toml` +- Modify: `crates/presentation/src/lib.rs` +- Modify: `crates/presentation/src/main.rs` + +When NATS_URL is not set, fall back to the `NoOpEventPublisher` so the API still starts without NATS. Use an env var `NATS_URL` — if set, use real publisher; if absent, log a warning and use no-op. + +- [ ] **Add `nats` to `crates/presentation/Cargo.toml` deps:** + +```toml +nats = { workspace = true } +async-nats = { workspace = true } +``` + +- [ ] **Update `crates/presentation/src/lib.rs`** — replace the `NoOpEventPublisher` struct and `build_state` function with one that optionally connects to NATS: + +Replace the existing `build_state` signature with an async version: + +```rust +use std::sync::Arc; +use sqlx::PgPool; +use state::AppState; +use async_trait::async_trait; +use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; + +pub mod errors; +pub mod extractors; +pub mod handlers; +pub mod routes; +pub mod state; + +use postgres_search::PgSearchRepository; + +struct NoOpEventPublisher; +#[async_trait] +impl EventPublisher for NoOpEventPublisher { + async fn publish(&self, _e: &DomainEvent) -> Result<(), DomainError> { Ok(()) } +} + +pub async fn build_state(pool: PgPool, jwt_secret: String) -> AppState { + let event_publisher: Arc = match std::env::var("NATS_URL") { + Ok(url) => { + match async_nats::connect(&url).await { + Ok(client) => { + tracing::info!("Connected to NATS at {url}"); + Arc::new(nats::NatsEventPublisher::new(client)) + } + Err(e) => { + tracing::warn!("Failed to connect to NATS at {url}: {e} — using no-op publisher"); + Arc::new(NoOpEventPublisher) + } + } + } + Err(_) => { + tracing::info!("NATS_URL not set — using no-op event publisher"); + Arc::new(NoOpEventPublisher) + } + }; + + AppState { + users: Arc::new(postgres::user::PgUserRepository::new(pool.clone())), + thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())), + likes: Arc::new(postgres::like::PgLikeRepository::new(pool.clone())), + boosts: Arc::new(postgres::boost::PgBoostRepository::new(pool.clone())), + follows: Arc::new(postgres::follow::PgFollowRepository::new(pool.clone())), + blocks: Arc::new(postgres::block::PgBlockRepository::new(pool.clone())), + tags: Arc::new(postgres::tag::PgTagRepository::new(pool.clone())), + api_keys: Arc::new(postgres::api_key::PgApiKeyRepository::new(pool.clone())), + top_friends: Arc::new(postgres::top_friend::PgTopFriendRepository::new(pool.clone())), + notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())), + remote_actors: Arc::new(postgres::remote_actor::PgRemoteActorRepository::new(pool.clone())), + feed: Arc::new(postgres::feed::PgFeedRepository::new(pool.clone())), + search: Arc::new(PgSearchRepository::new(pool.clone())), + auth: Arc::new(auth::JwtAuthService::new(jwt_secret, 86400 * 30)), + hasher: Arc::new(auth::Argon2PasswordHasher), + events: event_publisher, + } +} +``` + +- [ ] **Update `crates/presentation/src/main.rs`** — `build_state` is now async, so await it: + +```rust +use sqlx::PgPool; +use tower_http::cors::CorsLayer; +use tracing_subscriber::EnvFilter; + +#[tokio::main] +async fn main() { + dotenvy::dotenv().ok(); + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required"); + let jwt_secret = std::env::var("JWT_SECRET").expect("JWT_SECRET required"); + let port = std::env::var("PORT").unwrap_or_else(|_| "3000".into()); + + let pool = PgPool::connect(&database_url).await.expect("DB connect failed"); + sqlx::migrate!("../adapters/postgres/migrations").run(&pool).await.expect("Migrations failed"); + + let state = presentation::build_state(pool, jwt_secret).await; // note: .await + let app = presentation::routes::router() + .with_state(state) + .layer(CorsLayer::permissive()); + + let addr = format!("0.0.0.0:{port}"); + tracing::info!("Listening on {addr}"); + let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); + axum::serve(listener, app).await.unwrap(); +} +``` + +- [ ] **Run:** `cargo build -p presentation` + Expected: clean build. + +- [ ] **Verify no-op fallback works** (without NATS running): +```bash +DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres JWT_SECRET=dev \ +RUST_LOG=info cargo run -p presentation & +sleep 2 +# Should log: "NATS_URL not set — using no-op event publisher" +curl -s -X POST http://localhost:3000/auth/register \ + -H 'content-type: application/json' \ + -d '{"username":"natstest","email":"nats@test.com","password":"pw"}' | jq .token +kill %1 +``` + +- [ ] **Run full test suite:** +```bash +DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace +``` +Expected: all tests pass (52 + new worker tests = 55+). + +- [ ] **Commit:** +```bash +git add crates/presentation/ +git commit -m "feat(presentation): NatsEventPublisher with no-op fallback when NATS_URL unset" +``` + +--- + +## Self-Review + +**Spec coverage:** +- ✅ event-payload: serializable EventPayload enum, subject(), From/TryFrom conversions (Task 1) +- ✅ nats: NatsEventPublisher implementing EventPublisher (Task 2) +- ✅ nats: NatsEventConsumer implementing EventConsumer via BoxStream (Task 2) +- ✅ worker: NotificationHandler (LikeAdded, BoostAdded, FollowAccepted → notifications) (Task 3) +- ✅ worker: FederationHandler stub (Task 3) +- ✅ worker: consumer loop binary (Task 4) +- ✅ presentation: real NATS publisher with graceful no-op fallback (Task 5) +- ✅ event-publisher: stays as stub (correct — deferred per plan) + +**Placeholder scan:** None — all code blocks complete. + +**Type consistency:** +- `NatsEventPublisher::new(client: async_nats::Client)` — matches usage in presentation lib.rs and worker main.rs +- `NatsEventConsumer::new(client: async_nats::Client)` — matches worker main.rs +- `NotificationHandler { thoughts, notifications }` — field names match handler usage in main.rs +- `build_state` is now `async fn` — main.rs correctly awaits it +- `EventPayload::from(&DomainEvent)` — implemented in nats crate (which sees both types) + +**Notes:** +- Basic NATS (at-most-once delivery) is used — JetStream (exactly-once) deferred to later +- Worker Cargo.toml includes `postgres` internal crate for database access in handlers +- `crates/adapters/nats` has Rust module name `nats` but package name `nats` — import as `use nats::...`