# Thoughts v2 — Plan 3: Events + Worker (NATS) > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Wire real async event processing — use cases publish domain events to NATS, a worker binary subscribes and runs handlers (NotificationHandler creates DB records; FederationHandler is stubbed for Plan 4). **Architecture:** `event-payload/` holds the serializable NATS wire types. `nats/` wraps `async-nats` and implements both `EventPublisher` (publish to NATS) and `EventConsumer` (subscribe, yield `EventEnvelope` stream). `worker/` is a standalone binary that consumes events and dispatches to handlers. `presentation/` swaps its `NoOpEventPublisher` for the real NATS publisher. `event-publisher/` stays a stub (future fan-out to multiple backends). **Tech Stack:** Rust, async-nats 0.38, serde_json, futures, async-stream, tokio **Prerequisites:** NATS server running locally. Start with: ```bash docker run -d --name nats -p 4222:4222 nats:latest # or add to docker-compose if preferred ``` --- ## File Map ``` Modified: Cargo.toml ← add async-nats, async-stream to workspace.dependencies Modified: crates/adapters/event-payload/Cargo.toml ← add deps Modified: crates/adapters/event-payload/src/lib.rs ← EventPayload enum + subject() + From<&DomainEvent> Modified: crates/adapters/nats/Cargo.toml ← add deps Modified: crates/adapters/nats/src/lib.rs ← NatsEventPublisher + NatsEventConsumer Modified: crates/worker/Cargo.toml ← add deps, add [[bin]] Create: crates/worker/src/handlers.rs ← NotificationHandler, FederationHandler (stub) Modified: crates/worker/src/main.rs ← consumer loop binary Modified: crates/presentation/src/lib.rs ← swap NoOp for NatsEventPublisher Modified: crates/presentation/Cargo.toml ← add nats dep ``` --- ### Task 1: Workspace deps + event-payload crate **Files:** - Modify: `Cargo.toml` (root workspace) - Modify: `crates/adapters/event-payload/Cargo.toml` - Modify: `crates/adapters/event-payload/src/lib.rs` - [ ] **Add to root `Cargo.toml` `[workspace.dependencies]`:** ```toml async-nats = "0.38" async-stream = "0.3" event-payload = { path = "crates/adapters/event-payload" } event-publisher = { path = "crates/adapters/event-publisher" } nats = { path = "crates/adapters/nats" } ``` Check if `event-payload`, `event-publisher`, `nats` are already listed — they should be from Plan 1 scaffolding. If so, skip those lines and only add `async-nats` and `async-stream`. - [ ] **Write `crates/adapters/event-payload/Cargo.toml`:** ```toml [package] name = "event-payload" version = "0.1.0" edition = "2021" [dependencies] serde = { workspace = true } serde_json = { workspace = true } ``` - [ ] **Write `crates/adapters/event-payload/src/lib.rs`:** ```rust use serde::{Deserialize, Serialize}; /// Serializable mirror of domain::events::DomainEvent. /// All IDs are Strings (UUID hex) — no domain type dependencies. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", content = "data")] pub enum EventPayload { ThoughtCreated { thought_id: String, user_id: String, in_reply_to_id: Option, }, ThoughtDeleted { thought_id: String, user_id: String, }, ThoughtUpdated { thought_id: String, user_id: String, }, LikeAdded { like_id: String, user_id: String, thought_id: String, }, LikeRemoved { user_id: String, thought_id: String, }, BoostAdded { boost_id: String, user_id: String, thought_id: String, }, BoostRemoved { user_id: String, thought_id: String, }, FollowRequested { follower_id: String, following_id: String, }, FollowAccepted { follower_id: String, following_id: String, }, FollowRejected { follower_id: String, following_id: String, }, Unfollowed { follower_id: String, following_id: String, }, UserBlocked { blocker_id: String, blocked_id: String, }, } impl EventPayload { /// Returns the NATS subject for this event. pub fn subject(&self) -> &'static str { match self { Self::ThoughtCreated { .. } => "thoughts.created", Self::ThoughtDeleted { .. } => "thoughts.deleted", Self::ThoughtUpdated { .. } => "thoughts.updated", Self::LikeAdded { .. } => "likes.added", Self::LikeRemoved { .. } => "likes.removed", Self::BoostAdded { .. } => "boosts.added", Self::BoostRemoved { .. } => "boosts.removed", Self::FollowRequested { .. } => "follows.requested", Self::FollowAccepted { .. } => "follows.accepted", Self::FollowRejected { .. } => "follows.rejected", Self::Unfollowed { .. } => "follows.removed", Self::UserBlocked { .. } => "users.blocked", } } } #[cfg(test)] mod tests { use super::*; #[test] fn thought_created_roundtrip() { let p = EventPayload::ThoughtCreated { thought_id: "abc".into(), user_id: "def".into(), in_reply_to_id: None, }; let json = serde_json::to_string(&p).unwrap(); let back: EventPayload = serde_json::from_str(&json).unwrap(); assert_eq!(back.subject(), "thoughts.created"); } #[test] fn all_subjects_are_unique() { let samples: &[EventPayload] = &[ EventPayload::ThoughtCreated { thought_id: "a".into(), user_id: "b".into(), in_reply_to_id: None }, EventPayload::ThoughtDeleted { thought_id: "a".into(), user_id: "b".into() }, EventPayload::ThoughtUpdated { thought_id: "a".into(), user_id: "b".into() }, EventPayload::LikeAdded { like_id: "a".into(), user_id: "b".into(), thought_id: "c".into() }, EventPayload::LikeRemoved { user_id: "b".into(), thought_id: "c".into() }, EventPayload::BoostAdded { boost_id: "a".into(), user_id: "b".into(), thought_id: "c".into() }, EventPayload::BoostRemoved { user_id: "b".into(), thought_id: "c".into() }, EventPayload::FollowRequested { follower_id: "a".into(), following_id: "b".into() }, EventPayload::FollowAccepted { follower_id: "a".into(), following_id: "b".into() }, EventPayload::FollowRejected { follower_id: "a".into(), following_id: "b".into() }, EventPayload::Unfollowed { follower_id: "a".into(), following_id: "b".into() }, EventPayload::UserBlocked { blocker_id: "a".into(), blocked_id: "b".into() }, ]; let mut subjects: Vec<&str> = samples.iter().map(|p| p.subject()).collect(); subjects.sort(); subjects.dedup(); assert_eq!(subjects.len(), samples.len(), "each event must have a unique subject"); } } ``` - [ ] **Run:** `cargo test -p event-payload` Expected: 2 tests pass. - [ ] **Commit:** ```bash git add Cargo.toml crates/adapters/event-payload/ git commit -m "feat(event-payload): serializable NATS event payload types" ``` --- ### Task 2: nats crate — NatsEventPublisher + NatsEventConsumer **Files:** - Modify: `crates/adapters/nats/Cargo.toml` - Modify: `crates/adapters/nats/src/lib.rs` - [ ] **Write `crates/adapters/nats/Cargo.toml`:** ```toml [package] name = "nats" version = "0.1.0" edition = "2021" [dependencies] domain = { workspace = true } event-payload = { workspace = true } async-nats = { workspace = true } async-stream = { workspace = true } serde_json = { workspace = true } futures = { workspace = true } tokio = { workspace = true } async-trait = { workspace = true } tracing = { workspace = true } ``` - [ ] **Write test** at bottom of `crates/adapters/nats/src/lib.rs`: ```rust #[cfg(test)] mod tests { use super::*; use domain::value_objects::{ThoughtId, UserId}; #[test] fn payload_from_domain_event_has_correct_subject() { let event = domain::events::DomainEvent::ThoughtCreated { thought_id: ThoughtId::new(), user_id: UserId::new(), in_reply_to_id: None, }; let payload = EventPayload::from(&event); assert_eq!(payload.subject(), "thoughts.created"); } #[test] fn domain_event_roundtrip_via_payload() { let uid = UserId::new(); let tid = ThoughtId::new(); let event = domain::events::DomainEvent::LikeAdded { like_id: domain::value_objects::LikeId::new(), user_id: uid.clone(), thought_id: tid.clone(), }; let payload = EventPayload::from(&event); let back = domain::events::DomainEvent::try_from(payload).unwrap(); if let domain::events::DomainEvent::LikeAdded { user_id, thought_id, .. } = back { assert_eq!(user_id, uid); assert_eq!(thought_id, tid); } else { panic!("wrong variant"); } } } ``` - [ ] **Run:** `cargo test -p nats` — Expected: FAIL (lib.rs is empty). - [ ] **Write `crates/adapters/nats/src/lib.rs`:** ```rust use async_trait::async_trait; use domain::{ errors::DomainError, events::{DomainEvent, EventEnvelope}, ports::{EventConsumer, EventPublisher}, value_objects::{BoostId, LikeId, ThoughtId, UserId}, }; use event_payload::EventPayload; use futures::stream::BoxStream; // ── DomainEvent → EventPayload ───────────────────────────────────────────── impl From<&DomainEvent> for EventPayload { fn from(e: &DomainEvent) -> Self { match e { DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => Self::ThoughtCreated { thought_id: thought_id.to_string(), user_id: user_id.to_string(), in_reply_to_id: in_reply_to_id.as_ref().map(|x| x.to_string()), }, DomainEvent::ThoughtDeleted { thought_id, user_id } => Self::ThoughtDeleted { thought_id: thought_id.to_string(), user_id: user_id.to_string(), }, DomainEvent::ThoughtUpdated { thought_id, user_id } => Self::ThoughtUpdated { thought_id: thought_id.to_string(), user_id: user_id.to_string(), }, DomainEvent::LikeAdded { like_id, user_id, thought_id } => Self::LikeAdded { like_id: like_id.to_string(), user_id: user_id.to_string(), thought_id: thought_id.to_string(), }, DomainEvent::LikeRemoved { user_id, thought_id } => Self::LikeRemoved { user_id: user_id.to_string(), thought_id: thought_id.to_string(), }, DomainEvent::BoostAdded { boost_id, user_id, thought_id } => Self::BoostAdded { boost_id: boost_id.to_string(), user_id: user_id.to_string(), thought_id: thought_id.to_string(), }, DomainEvent::BoostRemoved { user_id, thought_id } => Self::BoostRemoved { user_id: user_id.to_string(), thought_id: thought_id.to_string(), }, DomainEvent::FollowRequested { follower_id, following_id } => Self::FollowRequested { follower_id: follower_id.to_string(), following_id: following_id.to_string(), }, DomainEvent::FollowAccepted { follower_id, following_id } => Self::FollowAccepted { follower_id: follower_id.to_string(), following_id: following_id.to_string(), }, DomainEvent::FollowRejected { follower_id, following_id } => Self::FollowRejected { follower_id: follower_id.to_string(), following_id: following_id.to_string(), }, DomainEvent::Unfollowed { follower_id, following_id } => Self::Unfollowed { follower_id: follower_id.to_string(), following_id: following_id.to_string(), }, DomainEvent::UserBlocked { blocker_id, blocked_id } => Self::UserBlocked { blocker_id: blocker_id.to_string(), blocked_id: blocked_id.to_string(), }, } } } // ── EventPayload → DomainEvent ───────────────────────────────────────────── fn parse_uuid(s: &str, field: &str) -> Result { uuid::Uuid::parse_str(s) .map_err(|_| DomainError::Internal(format!("invalid uuid for {field}: {s}"))) } impl TryFrom for DomainEvent { type Error = DomainError; fn try_from(p: EventPayload) -> Result { Ok(match p { EventPayload::ThoughtCreated { thought_id, user_id, in_reply_to_id } => DomainEvent::ThoughtCreated { thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), in_reply_to_id: in_reply_to_id .map(|s| parse_uuid(&s, "in_reply_to_id").map(ThoughtId::from_uuid)) .transpose()?, }, EventPayload::ThoughtDeleted { thought_id, user_id } => DomainEvent::ThoughtDeleted { thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), }, EventPayload::ThoughtUpdated { thought_id, user_id } => DomainEvent::ThoughtUpdated { thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), }, EventPayload::LikeAdded { like_id, user_id, thought_id } => DomainEvent::LikeAdded { like_id: LikeId::from_uuid(parse_uuid(&like_id, "like_id")?), user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), }, EventPayload::LikeRemoved { user_id, thought_id } => DomainEvent::LikeRemoved { user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), }, EventPayload::BoostAdded { boost_id, user_id, thought_id } => DomainEvent::BoostAdded { boost_id: BoostId::from_uuid(parse_uuid(&boost_id, "boost_id")?), user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), }, EventPayload::BoostRemoved { user_id, thought_id } => DomainEvent::BoostRemoved { user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), }, EventPayload::FollowRequested { follower_id, following_id } => DomainEvent::FollowRequested { follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?), following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?), }, EventPayload::FollowAccepted { follower_id, following_id } => DomainEvent::FollowAccepted { follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?), following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?), }, EventPayload::FollowRejected { follower_id, following_id } => DomainEvent::FollowRejected { follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?), following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?), }, EventPayload::Unfollowed { follower_id, following_id } => DomainEvent::Unfollowed { follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?), following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?), }, EventPayload::UserBlocked { blocker_id, blocked_id } => DomainEvent::UserBlocked { blocker_id: UserId::from_uuid(parse_uuid(&blocker_id, "blocker_id")?), blocked_id: UserId::from_uuid(parse_uuid(&blocked_id, "blocked_id")?), }, }) } } // ── NatsEventPublisher ──────────────────────────────────────────────────── pub struct NatsEventPublisher { client: async_nats::Client, } impl NatsEventPublisher { pub fn new(client: async_nats::Client) -> Self { Self { client } } } #[async_trait] impl EventPublisher for NatsEventPublisher { async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { let payload = EventPayload::from(event); let subject = payload.subject(); let bytes = serde_json::to_vec(&payload) .map_err(|e| DomainError::Internal(e.to_string()))?; self.client .publish(subject, bytes.into()) .await .map_err(|e| DomainError::Internal(e.to_string())) } } // ── NatsEventConsumer ───────────────────────────────────────────────────── pub struct NatsEventConsumer { client: async_nats::Client, } impl NatsEventConsumer { pub fn new(client: async_nats::Client) -> Self { Self { client } } } impl EventConsumer for NatsEventConsumer { fn consume(&self) -> BoxStream<'_, Result> { let client = self.client.clone(); Box::pin(async_stream::try_stream! { let mut sub = client .subscribe(">") .await .map_err(|e| DomainError::Internal(e.to_string()))?; use futures::StreamExt; while let Some(msg) = sub.next().await { let payload = match serde_json::from_slice::(&msg.payload) { Ok(p) => p, Err(e) => { tracing::warn!("failed to deserialize event payload: {e}"); continue; } }; let event = match DomainEvent::try_from(payload) { Ok(e) => e, Err(e) => { tracing::warn!("failed to convert payload to domain event: {e}"); continue; } }; // Basic NATS has no ack/nack — at-most-once delivery yield EventEnvelope { event, ack: Box::new(|| {}), nack: Box::new(|| {}), }; } }) } } ``` - [ ] **Run:** `cargo test -p nats` Expected: 2 tests pass. - [ ] **Commit:** ```bash git add crates/adapters/nats/ git commit -m "feat(nats): NatsEventPublisher and NatsEventConsumer with payload conversion" ``` --- ### Task 3: worker — NotificationHandler + FederationHandler **Files:** - Modify: `crates/worker/Cargo.toml` - Create: `crates/worker/src/handlers.rs` - [ ] **Write `crates/worker/Cargo.toml`:** ```toml [package] name = "worker" version = "0.1.0" edition = "2021" [[bin]] name = "thoughts-worker" path = "src/main.rs" [dependencies] domain = { workspace = true } nats = { workspace = true } event-payload = { workspace = true } postgres = { workspace = true } async-nats = { workspace = true } tokio = { workspace = true, features = ["full"] } futures = { workspace = true } async-trait = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } dotenvy = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } ``` - [ ] **Write tests** at bottom of `crates/worker/src/handlers.rs`: ```rust #[cfg(test)] mod tests { use super::*; use domain::{ models::{thought::{Thought, Visibility}, user::User}, testing::TestStore, value_objects::*, }; use std::sync::Arc; fn alice() -> User { User::new_local( UserId::new(), Username::new("alice").unwrap(), Email::new("alice@ex.com").unwrap(), PasswordHash("h".into()), ) } #[tokio::test] async fn like_added_creates_notification_for_thought_author() { let store = TestStore::default(); let alice = alice(); let bob_id = UserId::new(); // alice posts a thought let thought = Thought::new_local( ThoughtId::new(), alice.id.clone(), Content::new_local("hello").unwrap(), None, Visibility::Public, None, false, ); store.users.lock().unwrap().push(alice.clone()); store.thoughts.lock().unwrap().push(thought.clone()); let handler = NotificationHandler { thoughts: Arc::new(store.clone()), notifications: Arc::new(store.clone()), }; // bob likes alice's thought handler.handle(&DomainEvent::LikeAdded { like_id: LikeId::new(), user_id: bob_id.clone(), thought_id: thought.id.clone(), }).await.unwrap(); let notifs = store.notifications.lock().unwrap(); assert_eq!(notifs.len(), 1); assert_eq!(notifs[0].user_id, alice.id); // notification goes to alice assert!(matches!(notifs[0].notification_type, domain::models::notification::NotificationType::Like)); } #[tokio::test] async fn self_like_does_not_create_notification() { let store = TestStore::default(); let alice = alice(); let thought = Thought::new_local( ThoughtId::new(), alice.id.clone(), Content::new_local("hello").unwrap(), None, Visibility::Public, None, false, ); store.users.lock().unwrap().push(alice.clone()); store.thoughts.lock().unwrap().push(thought.clone()); let handler = NotificationHandler { thoughts: Arc::new(store.clone()), notifications: Arc::new(store.clone()), }; handler.handle(&DomainEvent::LikeAdded { like_id: LikeId::new(), user_id: alice.id.clone(), // alice likes her own thought thought_id: thought.id.clone(), }).await.unwrap(); assert!(store.notifications.lock().unwrap().is_empty()); } #[tokio::test] async fn follow_accepted_creates_notification() { let store = TestStore::default(); let alice = alice(); let bob_id = UserId::new(); store.users.lock().unwrap().push(alice.clone()); let handler = NotificationHandler { thoughts: Arc::new(store.clone()), notifications: Arc::new(store.clone()), }; // bob follows alice (alice gets notified) handler.handle(&DomainEvent::FollowAccepted { follower_id: bob_id.clone(), following_id: alice.id.clone(), }).await.unwrap(); let notifs = store.notifications.lock().unwrap(); assert_eq!(notifs.len(), 1); assert_eq!(notifs[0].user_id, alice.id); assert!(matches!(notifs[0].notification_type, domain::models::notification::NotificationType::Follow)); } } ``` - [ ] **Run:** `cargo test -p worker` — Expected: FAIL (handlers.rs doesn't exist yet). - [ ] **Create `crates/worker/src/handlers.rs`:** ```rust use std::sync::Arc; use chrono::Utc; use domain::{ errors::DomainError, events::DomainEvent, models::notification::{Notification, NotificationType}, ports::{NotificationRepository, ThoughtRepository}, value_objects::NotificationId, }; /// Handles domain events that should create notifications for users. pub struct NotificationHandler { pub thoughts: Arc, pub notifications: Arc, } impl NotificationHandler { pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { match event { DomainEvent::LikeAdded { like_id: _, user_id, thought_id } => { let thought = match self.thoughts.find_by_id(thought_id).await? { Some(t) => t, None => return Ok(()), // thought deleted — skip }; if thought.user_id == *user_id { return Ok(()); } // no self-notifications self.notifications.save(&Notification { id: NotificationId::new(), user_id: thought.user_id, notification_type: NotificationType::Like, from_user_id: Some(user_id.clone()), thought_id: Some(thought_id.clone()), read: false, created_at: Utc::now(), }).await } DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => { let thought = match self.thoughts.find_by_id(thought_id).await? { Some(t) => t, None => return Ok(()), }; if thought.user_id == *user_id { return Ok(()); } self.notifications.save(&Notification { id: NotificationId::new(), user_id: thought.user_id, notification_type: NotificationType::Boost, from_user_id: Some(user_id.clone()), thought_id: Some(thought_id.clone()), read: false, created_at: Utc::now(), }).await } DomainEvent::FollowAccepted { follower_id, following_id } => { // The person being followed (following_id) gets notified self.notifications.save(&Notification { id: NotificationId::new(), user_id: following_id.clone(), notification_type: NotificationType::Follow, from_user_id: Some(follower_id.clone()), thought_id: None, read: false, created_at: Utc::now(), }).await } // All other events: no notification needed in Plan 3 _ => Ok(()), } } } /// Stub handler for ActivityPub federation — implemented in Plan 4. pub struct FederationHandler; impl FederationHandler { pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { tracing::debug!(event = ?event, "federation handler (stub — Plan 4)"); Ok(()) } } ``` - [ ] **Run:** `cargo test -p worker` — Expected: 3 tests pass. - [ ] **Commit:** ```bash git add crates/worker/ git commit -m "feat(worker): NotificationHandler and FederationHandler stub" ``` --- ### Task 4: worker main binary **Files:** - Modify: `crates/worker/src/main.rs` - [ ] **Write `crates/worker/src/main.rs`:** ```rust mod handlers; use std::sync::Arc; use futures::StreamExt; use sqlx::PgPool; use domain::ports::EventConsumer; #[tokio::main] async fn main() { dotenvy::dotenv().ok(); tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .init(); let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required"); let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into()); tracing::info!("Connecting to postgres..."); let pool = PgPool::connect(&database_url).await.expect("DB connect failed"); tracing::info!("Connecting to NATS at {nats_url}..."); let nats_client = async_nats::connect(&nats_url).await.expect("NATS connect failed"); let consumer = nats::NatsEventConsumer::new(nats_client); let notification_handler = handlers::NotificationHandler { thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())), notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())), }; let federation_handler = handlers::FederationHandler; tracing::info!("Worker started, consuming events..."); let mut stream = consumer.consume(); while let Some(result) = stream.next().await { match result { Ok(envelope) => { let event = &envelope.event; tracing::debug!(subject = ?event, "received event"); let n_result = notification_handler.handle(event).await; let f_result = federation_handler.handle(event).await; if n_result.is_ok() && f_result.is_ok() { (envelope.ack)(); } else { if let Err(e) = n_result { tracing::error!("notification handler error: {e}"); } if let Err(e) = f_result { tracing::error!("federation handler error: {e}"); } (envelope.nack)(); } } Err(e) => { tracing::error!("consumer error: {e}"); } } } } ``` - [ ] **Run:** `cargo build -p worker` Expected: compiles cleanly (binary `thoughts-worker` produced). - [ ] **Smoke test** (requires NATS running): ```bash # Terminal 1: start NATS if not already running docker run -d --name nats -p 4222:4222 nats:latest || true # Terminal 2: start worker DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres \ RUST_LOG=info \ cargo run --bin thoughts-worker & sleep 2 # Terminal 3: start API server DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres JWT_SECRET=dev cargo run -p presentation & sleep 2 # Create a user, post a thought, like it — check that worker logs "received event" TOKEN=$(curl -s -X POST http://localhost:3000/auth/register \ -H 'content-type: application/json' \ -d '{"username":"evttest","email":"evt@test.com","password":"pw"}' | jq -r .token) TID=$(curl -s -X POST http://localhost:3000/thoughts \ -H 'content-type: application/json' \ -H "Authorization: Bearer $TOKEN" \ -d '{"content":"event test"}' | jq -r .id) curl -s -X POST http://localhost:3000/thoughts/$TID/like \ -H "Authorization: Bearer $TOKEN" kill %1 %2 2>/dev/null ``` Expected: worker logs show `received event` for the like. No errors. - [ ] **Commit:** ```bash git add crates/worker/src/main.rs git commit -m "feat(worker): consumer loop binary connecting NATS to handlers" ``` --- ### Task 5: Presentation — swap NoOp for real NatsEventPublisher **Files:** - Modify: `crates/presentation/Cargo.toml` - Modify: `crates/presentation/src/lib.rs` - Modify: `crates/presentation/src/main.rs` When NATS_URL is not set, fall back to the `NoOpEventPublisher` so the API still starts without NATS. Use an env var `NATS_URL` — if set, use real publisher; if absent, log a warning and use no-op. - [ ] **Add `nats` to `crates/presentation/Cargo.toml` deps:** ```toml nats = { workspace = true } async-nats = { workspace = true } ``` - [ ] **Update `crates/presentation/src/lib.rs`** — replace the `NoOpEventPublisher` struct and `build_state` function with one that optionally connects to NATS: Replace the existing `build_state` signature with an async version: ```rust use std::sync::Arc; use sqlx::PgPool; use state::AppState; use async_trait::async_trait; use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; pub mod errors; pub mod extractors; pub mod handlers; pub mod routes; pub mod state; use postgres_search::PgSearchRepository; struct NoOpEventPublisher; #[async_trait] impl EventPublisher for NoOpEventPublisher { async fn publish(&self, _e: &DomainEvent) -> Result<(), DomainError> { Ok(()) } } pub async fn build_state(pool: PgPool, jwt_secret: String) -> AppState { let event_publisher: Arc = match std::env::var("NATS_URL") { Ok(url) => { match async_nats::connect(&url).await { Ok(client) => { tracing::info!("Connected to NATS at {url}"); Arc::new(nats::NatsEventPublisher::new(client)) } Err(e) => { tracing::warn!("Failed to connect to NATS at {url}: {e} — using no-op publisher"); Arc::new(NoOpEventPublisher) } } } Err(_) => { tracing::info!("NATS_URL not set — using no-op event publisher"); Arc::new(NoOpEventPublisher) } }; AppState { users: Arc::new(postgres::user::PgUserRepository::new(pool.clone())), thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())), likes: Arc::new(postgres::like::PgLikeRepository::new(pool.clone())), boosts: Arc::new(postgres::boost::PgBoostRepository::new(pool.clone())), follows: Arc::new(postgres::follow::PgFollowRepository::new(pool.clone())), blocks: Arc::new(postgres::block::PgBlockRepository::new(pool.clone())), tags: Arc::new(postgres::tag::PgTagRepository::new(pool.clone())), api_keys: Arc::new(postgres::api_key::PgApiKeyRepository::new(pool.clone())), top_friends: Arc::new(postgres::top_friend::PgTopFriendRepository::new(pool.clone())), notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())), remote_actors: Arc::new(postgres::remote_actor::PgRemoteActorRepository::new(pool.clone())), feed: Arc::new(postgres::feed::PgFeedRepository::new(pool.clone())), search: Arc::new(PgSearchRepository::new(pool.clone())), auth: Arc::new(auth::JwtAuthService::new(jwt_secret, 86400 * 30)), hasher: Arc::new(auth::Argon2PasswordHasher), events: event_publisher, } } ``` - [ ] **Update `crates/presentation/src/main.rs`** — `build_state` is now async, so await it: ```rust use sqlx::PgPool; use tower_http::cors::CorsLayer; use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() { dotenvy::dotenv().ok(); tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .init(); let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required"); let jwt_secret = std::env::var("JWT_SECRET").expect("JWT_SECRET required"); let port = std::env::var("PORT").unwrap_or_else(|_| "3000".into()); let pool = PgPool::connect(&database_url).await.expect("DB connect failed"); sqlx::migrate!("../adapters/postgres/migrations").run(&pool).await.expect("Migrations failed"); let state = presentation::build_state(pool, jwt_secret).await; // note: .await let app = presentation::routes::router() .with_state(state) .layer(CorsLayer::permissive()); let addr = format!("0.0.0.0:{port}"); tracing::info!("Listening on {addr}"); let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); axum::serve(listener, app).await.unwrap(); } ``` - [ ] **Run:** `cargo build -p presentation` Expected: clean build. - [ ] **Verify no-op fallback works** (without NATS running): ```bash DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres JWT_SECRET=dev \ RUST_LOG=info cargo run -p presentation & sleep 2 # Should log: "NATS_URL not set — using no-op event publisher" curl -s -X POST http://localhost:3000/auth/register \ -H 'content-type: application/json' \ -d '{"username":"natstest","email":"nats@test.com","password":"pw"}' | jq .token kill %1 ``` - [ ] **Run full test suite:** ```bash DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace ``` Expected: all tests pass (52 + new worker tests = 55+). - [ ] **Commit:** ```bash git add crates/presentation/ git commit -m "feat(presentation): NatsEventPublisher with no-op fallback when NATS_URL unset" ``` --- ## Self-Review **Spec coverage:** - ✅ event-payload: serializable EventPayload enum, subject(), From/TryFrom conversions (Task 1) - ✅ nats: NatsEventPublisher implementing EventPublisher (Task 2) - ✅ nats: NatsEventConsumer implementing EventConsumer via BoxStream (Task 2) - ✅ worker: NotificationHandler (LikeAdded, BoostAdded, FollowAccepted → notifications) (Task 3) - ✅ worker: FederationHandler stub (Task 3) - ✅ worker: consumer loop binary (Task 4) - ✅ presentation: real NATS publisher with graceful no-op fallback (Task 5) - ✅ event-publisher: stays as stub (correct — deferred per plan) **Placeholder scan:** None — all code blocks complete. **Type consistency:** - `NatsEventPublisher::new(client: async_nats::Client)` — matches usage in presentation lib.rs and worker main.rs - `NatsEventConsumer::new(client: async_nats::Client)` — matches worker main.rs - `NotificationHandler { thoughts, notifications }` — field names match handler usage in main.rs - `build_state` is now `async fn` — main.rs correctly awaits it - `EventPayload::from(&DomainEvent)` — implemented in nats crate (which sees both types) **Notes:** - Basic NATS (at-most-once delivery) is used — JetStream (exactly-once) deferred to later - Worker Cargo.toml includes `postgres` internal crate for database access in handlers - `crates/adapters/nats` has Rust module name `nats` but package name `nats` — import as `use nats::...`