Files
thoughts/docs/superpowers/plans/2026-05-14-v2-plan3-events.md

997 lines
37 KiB
Markdown

# Thoughts v2 — Plan 3: Events + Worker (NATS)
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Wire real async event processing — use cases publish domain events to NATS, a worker binary subscribes and runs handlers (NotificationHandler creates DB records; FederationHandler is stubbed for Plan 4).
**Architecture:** `event-payload/` holds the serializable NATS wire types. `nats/` wraps `async-nats` and implements both `EventPublisher` (publish to NATS) and `EventConsumer` (subscribe, yield `EventEnvelope` stream). `worker/` is a standalone binary that consumes events and dispatches to handlers. `presentation/` swaps its `NoOpEventPublisher` for the real NATS publisher. `event-publisher/` stays a stub (future fan-out to multiple backends).
**Tech Stack:** Rust, async-nats 0.38, serde_json, futures, async-stream, tokio
**Prerequisites:** NATS server running locally. Start with:
```bash
docker run -d --name nats -p 4222:4222 nats:latest
# or add to docker-compose if preferred
```
---
## File Map
```
Modified: Cargo.toml ← add async-nats, async-stream to workspace.dependencies
Modified: crates/adapters/event-payload/Cargo.toml ← add deps
Modified: crates/adapters/event-payload/src/lib.rs ← EventPayload enum + subject() + From<&DomainEvent>
Modified: crates/adapters/nats/Cargo.toml ← add deps
Modified: crates/adapters/nats/src/lib.rs ← NatsEventPublisher + NatsEventConsumer
Modified: crates/worker/Cargo.toml ← add deps, add [[bin]]
Create: crates/worker/src/handlers.rs ← NotificationHandler, FederationHandler (stub)
Modified: crates/worker/src/main.rs ← consumer loop binary
Modified: crates/presentation/src/lib.rs ← swap NoOp for NatsEventPublisher
Modified: crates/presentation/Cargo.toml ← add nats dep
```
---
### Task 1: Workspace deps + event-payload crate
**Files:**
- Modify: `Cargo.toml` (root workspace)
- Modify: `crates/adapters/event-payload/Cargo.toml`
- Modify: `crates/adapters/event-payload/src/lib.rs`
- [ ] **Add to root `Cargo.toml` `[workspace.dependencies]`:**
```toml
async-nats = "0.38"
async-stream = "0.3"
event-payload = { path = "crates/adapters/event-payload" }
event-publisher = { path = "crates/adapters/event-publisher" }
nats = { path = "crates/adapters/nats" }
```
Check if `event-payload`, `event-publisher`, `nats` are already listed — they should be from Plan 1 scaffolding. If so, skip those lines and only add `async-nats` and `async-stream`.
- [ ] **Write `crates/adapters/event-payload/Cargo.toml`:**
```toml
[package]
name = "event-payload"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { workspace = true }
serde_json = { workspace = true }
```
- [ ] **Write `crates/adapters/event-payload/src/lib.rs`:**
```rust
use serde::{Deserialize, Serialize};
/// Serializable mirror of domain::events::DomainEvent.
/// All IDs are Strings (UUID hex) — no domain type dependencies.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum EventPayload {
ThoughtCreated {
thought_id: String,
user_id: String,
in_reply_to_id: Option<String>,
},
ThoughtDeleted {
thought_id: String,
user_id: String,
},
ThoughtUpdated {
thought_id: String,
user_id: String,
},
LikeAdded {
like_id: String,
user_id: String,
thought_id: String,
},
LikeRemoved {
user_id: String,
thought_id: String,
},
BoostAdded {
boost_id: String,
user_id: String,
thought_id: String,
},
BoostRemoved {
user_id: String,
thought_id: String,
},
FollowRequested {
follower_id: String,
following_id: String,
},
FollowAccepted {
follower_id: String,
following_id: String,
},
FollowRejected {
follower_id: String,
following_id: String,
},
Unfollowed {
follower_id: String,
following_id: String,
},
UserBlocked {
blocker_id: String,
blocked_id: String,
},
}
impl EventPayload {
/// Returns the NATS subject for this event.
pub fn subject(&self) -> &'static str {
match self {
Self::ThoughtCreated { .. } => "thoughts.created",
Self::ThoughtDeleted { .. } => "thoughts.deleted",
Self::ThoughtUpdated { .. } => "thoughts.updated",
Self::LikeAdded { .. } => "likes.added",
Self::LikeRemoved { .. } => "likes.removed",
Self::BoostAdded { .. } => "boosts.added",
Self::BoostRemoved { .. } => "boosts.removed",
Self::FollowRequested { .. } => "follows.requested",
Self::FollowAccepted { .. } => "follows.accepted",
Self::FollowRejected { .. } => "follows.rejected",
Self::Unfollowed { .. } => "follows.removed",
Self::UserBlocked { .. } => "users.blocked",
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn thought_created_roundtrip() {
let p = EventPayload::ThoughtCreated {
thought_id: "abc".into(),
user_id: "def".into(),
in_reply_to_id: None,
};
let json = serde_json::to_string(&p).unwrap();
let back: EventPayload = serde_json::from_str(&json).unwrap();
assert_eq!(back.subject(), "thoughts.created");
}
#[test]
fn all_subjects_are_unique() {
let samples: &[EventPayload] = &[
EventPayload::ThoughtCreated { thought_id: "a".into(), user_id: "b".into(), in_reply_to_id: None },
EventPayload::ThoughtDeleted { thought_id: "a".into(), user_id: "b".into() },
EventPayload::ThoughtUpdated { thought_id: "a".into(), user_id: "b".into() },
EventPayload::LikeAdded { like_id: "a".into(), user_id: "b".into(), thought_id: "c".into() },
EventPayload::LikeRemoved { user_id: "b".into(), thought_id: "c".into() },
EventPayload::BoostAdded { boost_id: "a".into(), user_id: "b".into(), thought_id: "c".into() },
EventPayload::BoostRemoved { user_id: "b".into(), thought_id: "c".into() },
EventPayload::FollowRequested { follower_id: "a".into(), following_id: "b".into() },
EventPayload::FollowAccepted { follower_id: "a".into(), following_id: "b".into() },
EventPayload::FollowRejected { follower_id: "a".into(), following_id: "b".into() },
EventPayload::Unfollowed { follower_id: "a".into(), following_id: "b".into() },
EventPayload::UserBlocked { blocker_id: "a".into(), blocked_id: "b".into() },
];
let mut subjects: Vec<&str> = samples.iter().map(|p| p.subject()).collect();
subjects.sort();
subjects.dedup();
assert_eq!(subjects.len(), samples.len(), "each event must have a unique subject");
}
}
```
- [ ] **Run:** `cargo test -p event-payload`
Expected: 2 tests pass.
- [ ] **Commit:**
```bash
git add Cargo.toml crates/adapters/event-payload/
git commit -m "feat(event-payload): serializable NATS event payload types"
```
---
### Task 2: nats crate — NatsEventPublisher + NatsEventConsumer
**Files:**
- Modify: `crates/adapters/nats/Cargo.toml`
- Modify: `crates/adapters/nats/src/lib.rs`
- [ ] **Write `crates/adapters/nats/Cargo.toml`:**
```toml
[package]
name = "nats"
version = "0.1.0"
edition = "2021"
[dependencies]
domain = { workspace = true }
event-payload = { workspace = true }
async-nats = { workspace = true }
async-stream = { workspace = true }
serde_json = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true }
async-trait = { workspace = true }
tracing = { workspace = true }
```
- [ ] **Write test** at bottom of `crates/adapters/nats/src/lib.rs`:
```rust
#[cfg(test)]
mod tests {
use super::*;
use domain::value_objects::{ThoughtId, UserId};
#[test]
fn payload_from_domain_event_has_correct_subject() {
let event = domain::events::DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: UserId::new(),
in_reply_to_id: None,
};
let payload = EventPayload::from(&event);
assert_eq!(payload.subject(), "thoughts.created");
}
#[test]
fn domain_event_roundtrip_via_payload() {
let uid = UserId::new();
let tid = ThoughtId::new();
let event = domain::events::DomainEvent::LikeAdded {
like_id: domain::value_objects::LikeId::new(),
user_id: uid.clone(),
thought_id: tid.clone(),
};
let payload = EventPayload::from(&event);
let back = domain::events::DomainEvent::try_from(payload).unwrap();
if let domain::events::DomainEvent::LikeAdded { user_id, thought_id, .. } = back {
assert_eq!(user_id, uid);
assert_eq!(thought_id, tid);
} else {
panic!("wrong variant");
}
}
}
```
- [ ] **Run:** `cargo test -p nats` — Expected: FAIL (lib.rs is empty).
- [ ] **Write `crates/adapters/nats/src/lib.rs`:**
```rust
use async_trait::async_trait;
use domain::{
errors::DomainError,
events::{DomainEvent, EventEnvelope},
ports::{EventConsumer, EventPublisher},
value_objects::{BoostId, LikeId, ThoughtId, UserId},
};
use event_payload::EventPayload;
use futures::stream::BoxStream;
// ── DomainEvent → EventPayload ─────────────────────────────────────────────
impl From<&DomainEvent> for EventPayload {
fn from(e: &DomainEvent) -> Self {
match e {
DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => Self::ThoughtCreated {
thought_id: thought_id.to_string(),
user_id: user_id.to_string(),
in_reply_to_id: in_reply_to_id.as_ref().map(|x| x.to_string()),
},
DomainEvent::ThoughtDeleted { thought_id, user_id } => Self::ThoughtDeleted {
thought_id: thought_id.to_string(), user_id: user_id.to_string(),
},
DomainEvent::ThoughtUpdated { thought_id, user_id } => Self::ThoughtUpdated {
thought_id: thought_id.to_string(), user_id: user_id.to_string(),
},
DomainEvent::LikeAdded { like_id, user_id, thought_id } => Self::LikeAdded {
like_id: like_id.to_string(), user_id: user_id.to_string(), thought_id: thought_id.to_string(),
},
DomainEvent::LikeRemoved { user_id, thought_id } => Self::LikeRemoved {
user_id: user_id.to_string(), thought_id: thought_id.to_string(),
},
DomainEvent::BoostAdded { boost_id, user_id, thought_id } => Self::BoostAdded {
boost_id: boost_id.to_string(), user_id: user_id.to_string(), thought_id: thought_id.to_string(),
},
DomainEvent::BoostRemoved { user_id, thought_id } => Self::BoostRemoved {
user_id: user_id.to_string(), thought_id: thought_id.to_string(),
},
DomainEvent::FollowRequested { follower_id, following_id } => Self::FollowRequested {
follower_id: follower_id.to_string(), following_id: following_id.to_string(),
},
DomainEvent::FollowAccepted { follower_id, following_id } => Self::FollowAccepted {
follower_id: follower_id.to_string(), following_id: following_id.to_string(),
},
DomainEvent::FollowRejected { follower_id, following_id } => Self::FollowRejected {
follower_id: follower_id.to_string(), following_id: following_id.to_string(),
},
DomainEvent::Unfollowed { follower_id, following_id } => Self::Unfollowed {
follower_id: follower_id.to_string(), following_id: following_id.to_string(),
},
DomainEvent::UserBlocked { blocker_id, blocked_id } => Self::UserBlocked {
blocker_id: blocker_id.to_string(), blocked_id: blocked_id.to_string(),
},
}
}
}
// ── EventPayload → DomainEvent ─────────────────────────────────────────────
fn parse_uuid(s: &str, field: &str) -> Result<uuid::Uuid, DomainError> {
uuid::Uuid::parse_str(s)
.map_err(|_| DomainError::Internal(format!("invalid uuid for {field}: {s}")))
}
impl TryFrom<EventPayload> for DomainEvent {
type Error = DomainError;
fn try_from(p: EventPayload) -> Result<Self, DomainError> {
Ok(match p {
EventPayload::ThoughtCreated { thought_id, user_id, in_reply_to_id } => DomainEvent::ThoughtCreated {
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
in_reply_to_id: in_reply_to_id
.map(|s| parse_uuid(&s, "in_reply_to_id").map(ThoughtId::from_uuid))
.transpose()?,
},
EventPayload::ThoughtDeleted { thought_id, user_id } => DomainEvent::ThoughtDeleted {
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
},
EventPayload::ThoughtUpdated { thought_id, user_id } => DomainEvent::ThoughtUpdated {
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
},
EventPayload::LikeAdded { like_id, user_id, thought_id } => DomainEvent::LikeAdded {
like_id: LikeId::from_uuid(parse_uuid(&like_id, "like_id")?),
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
},
EventPayload::LikeRemoved { user_id, thought_id } => DomainEvent::LikeRemoved {
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
},
EventPayload::BoostAdded { boost_id, user_id, thought_id } => DomainEvent::BoostAdded {
boost_id: BoostId::from_uuid(parse_uuid(&boost_id, "boost_id")?),
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
},
EventPayload::BoostRemoved { user_id, thought_id } => DomainEvent::BoostRemoved {
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
},
EventPayload::FollowRequested { follower_id, following_id } => DomainEvent::FollowRequested {
follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
},
EventPayload::FollowAccepted { follower_id, following_id } => DomainEvent::FollowAccepted {
follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
},
EventPayload::FollowRejected { follower_id, following_id } => DomainEvent::FollowRejected {
follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
},
EventPayload::Unfollowed { follower_id, following_id } => DomainEvent::Unfollowed {
follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
},
EventPayload::UserBlocked { blocker_id, blocked_id } => DomainEvent::UserBlocked {
blocker_id: UserId::from_uuid(parse_uuid(&blocker_id, "blocker_id")?),
blocked_id: UserId::from_uuid(parse_uuid(&blocked_id, "blocked_id")?),
},
})
}
}
// ── NatsEventPublisher ────────────────────────────────────────────────────
pub struct NatsEventPublisher {
client: async_nats::Client,
}
impl NatsEventPublisher {
pub fn new(client: async_nats::Client) -> Self { Self { client } }
}
#[async_trait]
impl EventPublisher for NatsEventPublisher {
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
let payload = EventPayload::from(event);
let subject = payload.subject();
let bytes = serde_json::to_vec(&payload)
.map_err(|e| DomainError::Internal(e.to_string()))?;
self.client
.publish(subject, bytes.into())
.await
.map_err(|e| DomainError::Internal(e.to_string()))
}
}
// ── NatsEventConsumer ─────────────────────────────────────────────────────
pub struct NatsEventConsumer {
client: async_nats::Client,
}
impl NatsEventConsumer {
pub fn new(client: async_nats::Client) -> Self { Self { client } }
}
impl EventConsumer for NatsEventConsumer {
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
let client = self.client.clone();
Box::pin(async_stream::try_stream! {
let mut sub = client
.subscribe(">")
.await
.map_err(|e| DomainError::Internal(e.to_string()))?;
use futures::StreamExt;
while let Some(msg) = sub.next().await {
let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
Ok(p) => p,
Err(e) => {
tracing::warn!("failed to deserialize event payload: {e}");
continue;
}
};
let event = match DomainEvent::try_from(payload) {
Ok(e) => e,
Err(e) => {
tracing::warn!("failed to convert payload to domain event: {e}");
continue;
}
};
// Basic NATS has no ack/nack — at-most-once delivery
yield EventEnvelope {
event,
ack: Box::new(|| {}),
nack: Box::new(|| {}),
};
}
})
}
}
```
- [ ] **Run:** `cargo test -p nats`
Expected: 2 tests pass.
- [ ] **Commit:**
```bash
git add crates/adapters/nats/
git commit -m "feat(nats): NatsEventPublisher and NatsEventConsumer with payload conversion"
```
---
### Task 3: worker — NotificationHandler + FederationHandler
**Files:**
- Modify: `crates/worker/Cargo.toml`
- Create: `crates/worker/src/handlers.rs`
- [ ] **Write `crates/worker/Cargo.toml`:**
```toml
[package]
name = "worker"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "thoughts-worker"
path = "src/main.rs"
[dependencies]
domain = { workspace = true }
nats = { workspace = true }
event-payload = { workspace = true }
postgres = { workspace = true }
async-nats = { workspace = true }
tokio = { workspace = true, features = ["full"] }
futures = { workspace = true }
async-trait = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
dotenvy = { workspace = true }
serde_json = { workspace = true }
chrono = { workspace = true }
```
- [ ] **Write tests** at bottom of `crates/worker/src/handlers.rs`:
```rust
#[cfg(test)]
mod tests {
use super::*;
use domain::{
models::{thought::{Thought, Visibility}, user::User},
testing::TestStore,
value_objects::*,
};
use std::sync::Arc;
fn alice() -> User {
User::new_local(
UserId::new(),
Username::new("alice").unwrap(),
Email::new("alice@ex.com").unwrap(),
PasswordHash("h".into()),
)
}
#[tokio::test]
async fn like_added_creates_notification_for_thought_author() {
let store = TestStore::default();
let alice = alice();
let bob_id = UserId::new();
// alice posts a thought
let thought = Thought::new_local(
ThoughtId::new(), alice.id.clone(),
Content::new_local("hello").unwrap(),
None, Visibility::Public, None, false,
);
store.users.lock().unwrap().push(alice.clone());
store.thoughts.lock().unwrap().push(thought.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
// bob likes alice's thought
handler.handle(&DomainEvent::LikeAdded {
like_id: LikeId::new(),
user_id: bob_id.clone(),
thought_id: thought.id.clone(),
}).await.unwrap();
let notifs = store.notifications.lock().unwrap();
assert_eq!(notifs.len(), 1);
assert_eq!(notifs[0].user_id, alice.id); // notification goes to alice
assert!(matches!(notifs[0].notification_type, domain::models::notification::NotificationType::Like));
}
#[tokio::test]
async fn self_like_does_not_create_notification() {
let store = TestStore::default();
let alice = alice();
let thought = Thought::new_local(
ThoughtId::new(), alice.id.clone(),
Content::new_local("hello").unwrap(),
None, Visibility::Public, None, false,
);
store.users.lock().unwrap().push(alice.clone());
store.thoughts.lock().unwrap().push(thought.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
handler.handle(&DomainEvent::LikeAdded {
like_id: LikeId::new(),
user_id: alice.id.clone(), // alice likes her own thought
thought_id: thought.id.clone(),
}).await.unwrap();
assert!(store.notifications.lock().unwrap().is_empty());
}
#[tokio::test]
async fn follow_accepted_creates_notification() {
let store = TestStore::default();
let alice = alice();
let bob_id = UserId::new();
store.users.lock().unwrap().push(alice.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
// bob follows alice (alice gets notified)
handler.handle(&DomainEvent::FollowAccepted {
follower_id: bob_id.clone(),
following_id: alice.id.clone(),
}).await.unwrap();
let notifs = store.notifications.lock().unwrap();
assert_eq!(notifs.len(), 1);
assert_eq!(notifs[0].user_id, alice.id);
assert!(matches!(notifs[0].notification_type, domain::models::notification::NotificationType::Follow));
}
}
```
- [ ] **Run:** `cargo test -p worker` — Expected: FAIL (handlers.rs doesn't exist yet).
- [ ] **Create `crates/worker/src/handlers.rs`:**
```rust
use std::sync::Arc;
use chrono::Utc;
use domain::{
errors::DomainError,
events::DomainEvent,
models::notification::{Notification, NotificationType},
ports::{NotificationRepository, ThoughtRepository},
value_objects::NotificationId,
};
/// Handles domain events that should create notifications for users.
pub struct NotificationHandler {
pub thoughts: Arc<dyn ThoughtRepository>,
pub notifications: Arc<dyn NotificationRepository>,
}
impl NotificationHandler {
pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
match event {
DomainEvent::LikeAdded { like_id: _, user_id, thought_id } => {
let thought = match self.thoughts.find_by_id(thought_id).await? {
Some(t) => t,
None => return Ok(()), // thought deleted — skip
};
if thought.user_id == *user_id { return Ok(()); } // no self-notifications
self.notifications.save(&Notification {
id: NotificationId::new(),
user_id: thought.user_id,
notification_type: NotificationType::Like,
from_user_id: Some(user_id.clone()),
thought_id: Some(thought_id.clone()),
read: false,
created_at: Utc::now(),
}).await
}
DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => {
let thought = match self.thoughts.find_by_id(thought_id).await? {
Some(t) => t,
None => return Ok(()),
};
if thought.user_id == *user_id { return Ok(()); }
self.notifications.save(&Notification {
id: NotificationId::new(),
user_id: thought.user_id,
notification_type: NotificationType::Boost,
from_user_id: Some(user_id.clone()),
thought_id: Some(thought_id.clone()),
read: false,
created_at: Utc::now(),
}).await
}
DomainEvent::FollowAccepted { follower_id, following_id } => {
// The person being followed (following_id) gets notified
self.notifications.save(&Notification {
id: NotificationId::new(),
user_id: following_id.clone(),
notification_type: NotificationType::Follow,
from_user_id: Some(follower_id.clone()),
thought_id: None,
read: false,
created_at: Utc::now(),
}).await
}
// All other events: no notification needed in Plan 3
_ => Ok(()),
}
}
}
/// Stub handler for ActivityPub federation — implemented in Plan 4.
pub struct FederationHandler;
impl FederationHandler {
pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
tracing::debug!(event = ?event, "federation handler (stub — Plan 4)");
Ok(())
}
}
```
- [ ] **Run:** `cargo test -p worker` — Expected: 3 tests pass.
- [ ] **Commit:**
```bash
git add crates/worker/
git commit -m "feat(worker): NotificationHandler and FederationHandler stub"
```
---
### Task 4: worker main binary
**Files:**
- Modify: `crates/worker/src/main.rs`
- [ ] **Write `crates/worker/src/main.rs`:**
```rust
mod handlers;
use std::sync::Arc;
use futures::StreamExt;
use sqlx::PgPool;
use domain::ports::EventConsumer;
#[tokio::main]
async fn main() {
dotenvy::dotenv().ok();
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required");
let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());
tracing::info!("Connecting to postgres...");
let pool = PgPool::connect(&database_url).await.expect("DB connect failed");
tracing::info!("Connecting to NATS at {nats_url}...");
let nats_client = async_nats::connect(&nats_url).await.expect("NATS connect failed");
let consumer = nats::NatsEventConsumer::new(nats_client);
let notification_handler = handlers::NotificationHandler {
thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())),
notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())),
};
let federation_handler = handlers::FederationHandler;
tracing::info!("Worker started, consuming events...");
let mut stream = consumer.consume();
while let Some(result) = stream.next().await {
match result {
Ok(envelope) => {
let event = &envelope.event;
tracing::debug!(subject = ?event, "received event");
let n_result = notification_handler.handle(event).await;
let f_result = federation_handler.handle(event).await;
if n_result.is_ok() && f_result.is_ok() {
(envelope.ack)();
} else {
if let Err(e) = n_result { tracing::error!("notification handler error: {e}"); }
if let Err(e) = f_result { tracing::error!("federation handler error: {e}"); }
(envelope.nack)();
}
}
Err(e) => {
tracing::error!("consumer error: {e}");
}
}
}
}
```
- [ ] **Run:** `cargo build -p worker`
Expected: compiles cleanly (binary `thoughts-worker` produced).
- [ ] **Smoke test** (requires NATS running):
```bash
# Terminal 1: start NATS if not already running
docker run -d --name nats -p 4222:4222 nats:latest || true
# Terminal 2: start worker
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres \
RUST_LOG=info \
cargo run --bin thoughts-worker &
sleep 2
# Terminal 3: start API server
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres JWT_SECRET=dev cargo run -p presentation &
sleep 2
# Create a user, post a thought, like it — check that worker logs "received event"
TOKEN=$(curl -s -X POST http://localhost:3000/auth/register \
-H 'content-type: application/json' \
-d '{"username":"evttest","email":"evt@test.com","password":"pw"}' | jq -r .token)
TID=$(curl -s -X POST http://localhost:3000/thoughts \
-H 'content-type: application/json' \
-H "Authorization: Bearer $TOKEN" \
-d '{"content":"event test"}' | jq -r .id)
curl -s -X POST http://localhost:3000/thoughts/$TID/like \
-H "Authorization: Bearer $TOKEN"
kill %1 %2 2>/dev/null
```
Expected: worker logs show `received event` for the like. No errors.
- [ ] **Commit:**
```bash
git add crates/worker/src/main.rs
git commit -m "feat(worker): consumer loop binary connecting NATS to handlers"
```
---
### Task 5: Presentation — swap NoOp for real NatsEventPublisher
**Files:**
- Modify: `crates/presentation/Cargo.toml`
- Modify: `crates/presentation/src/lib.rs`
- Modify: `crates/presentation/src/main.rs`
When NATS_URL is not set, fall back to the `NoOpEventPublisher` so the API still starts without NATS. Use an env var `NATS_URL` — if set, use real publisher; if absent, log a warning and use no-op.
- [ ] **Add `nats` to `crates/presentation/Cargo.toml` deps:**
```toml
nats = { workspace = true }
async-nats = { workspace = true }
```
- [ ] **Update `crates/presentation/src/lib.rs`** — replace the `NoOpEventPublisher` struct and `build_state` function with one that optionally connects to NATS:
Replace the existing `build_state` signature with an async version:
```rust
use std::sync::Arc;
use sqlx::PgPool;
use state::AppState;
use async_trait::async_trait;
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
pub mod errors;
pub mod extractors;
pub mod handlers;
pub mod routes;
pub mod state;
use postgres_search::PgSearchRepository;
struct NoOpEventPublisher;
#[async_trait]
impl EventPublisher for NoOpEventPublisher {
async fn publish(&self, _e: &DomainEvent) -> Result<(), DomainError> { Ok(()) }
}
pub async fn build_state(pool: PgPool, jwt_secret: String) -> AppState {
let event_publisher: Arc<dyn EventPublisher> = match std::env::var("NATS_URL") {
Ok(url) => {
match async_nats::connect(&url).await {
Ok(client) => {
tracing::info!("Connected to NATS at {url}");
Arc::new(nats::NatsEventPublisher::new(client))
}
Err(e) => {
tracing::warn!("Failed to connect to NATS at {url}: {e} — using no-op publisher");
Arc::new(NoOpEventPublisher)
}
}
}
Err(_) => {
tracing::info!("NATS_URL not set — using no-op event publisher");
Arc::new(NoOpEventPublisher)
}
};
AppState {
users: Arc::new(postgres::user::PgUserRepository::new(pool.clone())),
thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())),
likes: Arc::new(postgres::like::PgLikeRepository::new(pool.clone())),
boosts: Arc::new(postgres::boost::PgBoostRepository::new(pool.clone())),
follows: Arc::new(postgres::follow::PgFollowRepository::new(pool.clone())),
blocks: Arc::new(postgres::block::PgBlockRepository::new(pool.clone())),
tags: Arc::new(postgres::tag::PgTagRepository::new(pool.clone())),
api_keys: Arc::new(postgres::api_key::PgApiKeyRepository::new(pool.clone())),
top_friends: Arc::new(postgres::top_friend::PgTopFriendRepository::new(pool.clone())),
notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())),
remote_actors: Arc::new(postgres::remote_actor::PgRemoteActorRepository::new(pool.clone())),
feed: Arc::new(postgres::feed::PgFeedRepository::new(pool.clone())),
search: Arc::new(PgSearchRepository::new(pool.clone())),
auth: Arc::new(auth::JwtAuthService::new(jwt_secret, 86400 * 30)),
hasher: Arc::new(auth::Argon2PasswordHasher),
events: event_publisher,
}
}
```
- [ ] **Update `crates/presentation/src/main.rs`**`build_state` is now async, so await it:
```rust
use sqlx::PgPool;
use tower_http::cors::CorsLayer;
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() {
dotenvy::dotenv().ok();
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required");
let jwt_secret = std::env::var("JWT_SECRET").expect("JWT_SECRET required");
let port = std::env::var("PORT").unwrap_or_else(|_| "3000".into());
let pool = PgPool::connect(&database_url).await.expect("DB connect failed");
sqlx::migrate!("../adapters/postgres/migrations").run(&pool).await.expect("Migrations failed");
let state = presentation::build_state(pool, jwt_secret).await; // note: .await
let app = presentation::routes::router()
.with_state(state)
.layer(CorsLayer::permissive());
let addr = format!("0.0.0.0:{port}");
tracing::info!("Listening on {addr}");
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
}
```
- [ ] **Run:** `cargo build -p presentation`
Expected: clean build.
- [ ] **Verify no-op fallback works** (without NATS running):
```bash
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres JWT_SECRET=dev \
RUST_LOG=info cargo run -p presentation &
sleep 2
# Should log: "NATS_URL not set — using no-op event publisher"
curl -s -X POST http://localhost:3000/auth/register \
-H 'content-type: application/json' \
-d '{"username":"natstest","email":"nats@test.com","password":"pw"}' | jq .token
kill %1
```
- [ ] **Run full test suite:**
```bash
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace
```
Expected: all tests pass (52 + new worker tests = 55+).
- [ ] **Commit:**
```bash
git add crates/presentation/
git commit -m "feat(presentation): NatsEventPublisher with no-op fallback when NATS_URL unset"
```
---
## Self-Review
**Spec coverage:**
- ✅ event-payload: serializable EventPayload enum, subject(), From/TryFrom conversions (Task 1)
- ✅ nats: NatsEventPublisher implementing EventPublisher (Task 2)
- ✅ nats: NatsEventConsumer implementing EventConsumer via BoxStream (Task 2)
- ✅ worker: NotificationHandler (LikeAdded, BoostAdded, FollowAccepted → notifications) (Task 3)
- ✅ worker: FederationHandler stub (Task 3)
- ✅ worker: consumer loop binary (Task 4)
- ✅ presentation: real NATS publisher with graceful no-op fallback (Task 5)
- ✅ event-publisher: stays as stub (correct — deferred per plan)
**Placeholder scan:** None — all code blocks complete.
**Type consistency:**
- `NatsEventPublisher::new(client: async_nats::Client)` — matches usage in presentation lib.rs and worker main.rs
- `NatsEventConsumer::new(client: async_nats::Client)` — matches worker main.rs
- `NotificationHandler { thoughts, notifications }` — field names match handler usage in main.rs
- `build_state` is now `async fn` — main.rs correctly awaits it
- `EventPayload::from(&DomainEvent)` — implemented in nats crate (which sees both types)
**Notes:**
- Basic NATS (at-most-once delivery) is used — JetStream (exactly-once) deferred to later
- Worker Cargo.toml includes `postgres` internal crate for database access in handlers
- `crates/adapters/nats` has Rust module name `nats` but package name `nats` — import as `use nats::...`