docs: v2 Plan 3 events+worker implementation plan
This commit is contained in:
996
docs/superpowers/plans/2026-05-14-v2-plan3-events.md
Normal file
996
docs/superpowers/plans/2026-05-14-v2-plan3-events.md
Normal file
@@ -0,0 +1,996 @@
|
|||||||
|
# Thoughts v2 — Plan 3: Events + Worker (NATS)
|
||||||
|
|
||||||
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||||
|
|
||||||
|
**Goal:** Wire real async event processing — use cases publish domain events to NATS, a worker binary subscribes and runs handlers (NotificationHandler creates DB records; FederationHandler is stubbed for Plan 4).
|
||||||
|
|
||||||
|
**Architecture:** `event-payload/` holds the serializable NATS wire types. `nats/` wraps `async-nats` and implements both `EventPublisher` (publish to NATS) and `EventConsumer` (subscribe, yield `EventEnvelope` stream). `worker/` is a standalone binary that consumes events and dispatches to handlers. `presentation/` swaps its `NoOpEventPublisher` for the real NATS publisher. `event-publisher/` stays a stub (future fan-out to multiple backends).
|
||||||
|
|
||||||
|
**Tech Stack:** Rust, async-nats 0.38, serde_json, futures, async-stream, tokio
|
||||||
|
|
||||||
|
**Prerequisites:** NATS server running locally. Start with:
|
||||||
|
```bash
|
||||||
|
docker run -d --name nats -p 4222:4222 nats:latest
|
||||||
|
# or add to docker-compose if preferred
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## File Map
|
||||||
|
|
||||||
|
```
|
||||||
|
Modified: Cargo.toml ← add async-nats, async-stream to workspace.dependencies
|
||||||
|
Modified: crates/adapters/event-payload/Cargo.toml ← add deps
|
||||||
|
Modified: crates/adapters/event-payload/src/lib.rs ← EventPayload enum + subject() + From<&DomainEvent>
|
||||||
|
Modified: crates/adapters/nats/Cargo.toml ← add deps
|
||||||
|
Modified: crates/adapters/nats/src/lib.rs ← NatsEventPublisher + NatsEventConsumer
|
||||||
|
Modified: crates/worker/Cargo.toml ← add deps, add [[bin]]
|
||||||
|
Create: crates/worker/src/handlers.rs ← NotificationHandler, FederationHandler (stub)
|
||||||
|
Modified: crates/worker/src/main.rs ← consumer loop binary
|
||||||
|
Modified: crates/presentation/src/lib.rs ← swap NoOp for NatsEventPublisher
|
||||||
|
Modified: crates/presentation/Cargo.toml ← add nats dep
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 1: Workspace deps + event-payload crate
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `Cargo.toml` (root workspace)
|
||||||
|
- Modify: `crates/adapters/event-payload/Cargo.toml`
|
||||||
|
- Modify: `crates/adapters/event-payload/src/lib.rs`
|
||||||
|
|
||||||
|
- [ ] **Add to root `Cargo.toml` `[workspace.dependencies]`:**
|
||||||
|
|
||||||
|
```toml
|
||||||
|
async-nats = "0.38"
|
||||||
|
async-stream = "0.3"
|
||||||
|
|
||||||
|
event-payload = { path = "crates/adapters/event-payload" }
|
||||||
|
event-publisher = { path = "crates/adapters/event-publisher" }
|
||||||
|
nats = { path = "crates/adapters/nats" }
|
||||||
|
```
|
||||||
|
|
||||||
|
Check if `event-payload`, `event-publisher`, `nats` are already listed — they should be from Plan 1 scaffolding. If so, skip those lines and only add `async-nats` and `async-stream`.
|
||||||
|
|
||||||
|
- [ ] **Write `crates/adapters/event-payload/Cargo.toml`:**
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[package]
|
||||||
|
name = "event-payload"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
serde = { workspace = true }
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Write `crates/adapters/event-payload/src/lib.rs`:**
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// Serializable mirror of domain::events::DomainEvent.
|
||||||
|
/// All IDs are Strings (UUID hex) — no domain type dependencies.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "type", content = "data")]
|
||||||
|
pub enum EventPayload {
|
||||||
|
ThoughtCreated {
|
||||||
|
thought_id: String,
|
||||||
|
user_id: String,
|
||||||
|
in_reply_to_id: Option<String>,
|
||||||
|
},
|
||||||
|
ThoughtDeleted {
|
||||||
|
thought_id: String,
|
||||||
|
user_id: String,
|
||||||
|
},
|
||||||
|
ThoughtUpdated {
|
||||||
|
thought_id: String,
|
||||||
|
user_id: String,
|
||||||
|
},
|
||||||
|
LikeAdded {
|
||||||
|
like_id: String,
|
||||||
|
user_id: String,
|
||||||
|
thought_id: String,
|
||||||
|
},
|
||||||
|
LikeRemoved {
|
||||||
|
user_id: String,
|
||||||
|
thought_id: String,
|
||||||
|
},
|
||||||
|
BoostAdded {
|
||||||
|
boost_id: String,
|
||||||
|
user_id: String,
|
||||||
|
thought_id: String,
|
||||||
|
},
|
||||||
|
BoostRemoved {
|
||||||
|
user_id: String,
|
||||||
|
thought_id: String,
|
||||||
|
},
|
||||||
|
FollowRequested {
|
||||||
|
follower_id: String,
|
||||||
|
following_id: String,
|
||||||
|
},
|
||||||
|
FollowAccepted {
|
||||||
|
follower_id: String,
|
||||||
|
following_id: String,
|
||||||
|
},
|
||||||
|
FollowRejected {
|
||||||
|
follower_id: String,
|
||||||
|
following_id: String,
|
||||||
|
},
|
||||||
|
Unfollowed {
|
||||||
|
follower_id: String,
|
||||||
|
following_id: String,
|
||||||
|
},
|
||||||
|
UserBlocked {
|
||||||
|
blocker_id: String,
|
||||||
|
blocked_id: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventPayload {
|
||||||
|
/// Returns the NATS subject for this event.
|
||||||
|
pub fn subject(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Self::ThoughtCreated { .. } => "thoughts.created",
|
||||||
|
Self::ThoughtDeleted { .. } => "thoughts.deleted",
|
||||||
|
Self::ThoughtUpdated { .. } => "thoughts.updated",
|
||||||
|
Self::LikeAdded { .. } => "likes.added",
|
||||||
|
Self::LikeRemoved { .. } => "likes.removed",
|
||||||
|
Self::BoostAdded { .. } => "boosts.added",
|
||||||
|
Self::BoostRemoved { .. } => "boosts.removed",
|
||||||
|
Self::FollowRequested { .. } => "follows.requested",
|
||||||
|
Self::FollowAccepted { .. } => "follows.accepted",
|
||||||
|
Self::FollowRejected { .. } => "follows.rejected",
|
||||||
|
Self::Unfollowed { .. } => "follows.removed",
|
||||||
|
Self::UserBlocked { .. } => "users.blocked",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn thought_created_roundtrip() {
|
||||||
|
let p = EventPayload::ThoughtCreated {
|
||||||
|
thought_id: "abc".into(),
|
||||||
|
user_id: "def".into(),
|
||||||
|
in_reply_to_id: None,
|
||||||
|
};
|
||||||
|
let json = serde_json::to_string(&p).unwrap();
|
||||||
|
let back: EventPayload = serde_json::from_str(&json).unwrap();
|
||||||
|
assert_eq!(back.subject(), "thoughts.created");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn all_subjects_are_unique() {
|
||||||
|
let samples: &[EventPayload] = &[
|
||||||
|
EventPayload::ThoughtCreated { thought_id: "a".into(), user_id: "b".into(), in_reply_to_id: None },
|
||||||
|
EventPayload::ThoughtDeleted { thought_id: "a".into(), user_id: "b".into() },
|
||||||
|
EventPayload::ThoughtUpdated { thought_id: "a".into(), user_id: "b".into() },
|
||||||
|
EventPayload::LikeAdded { like_id: "a".into(), user_id: "b".into(), thought_id: "c".into() },
|
||||||
|
EventPayload::LikeRemoved { user_id: "b".into(), thought_id: "c".into() },
|
||||||
|
EventPayload::BoostAdded { boost_id: "a".into(), user_id: "b".into(), thought_id: "c".into() },
|
||||||
|
EventPayload::BoostRemoved { user_id: "b".into(), thought_id: "c".into() },
|
||||||
|
EventPayload::FollowRequested { follower_id: "a".into(), following_id: "b".into() },
|
||||||
|
EventPayload::FollowAccepted { follower_id: "a".into(), following_id: "b".into() },
|
||||||
|
EventPayload::FollowRejected { follower_id: "a".into(), following_id: "b".into() },
|
||||||
|
EventPayload::Unfollowed { follower_id: "a".into(), following_id: "b".into() },
|
||||||
|
EventPayload::UserBlocked { blocker_id: "a".into(), blocked_id: "b".into() },
|
||||||
|
];
|
||||||
|
let mut subjects: Vec<&str> = samples.iter().map(|p| p.subject()).collect();
|
||||||
|
subjects.sort();
|
||||||
|
subjects.dedup();
|
||||||
|
assert_eq!(subjects.len(), samples.len(), "each event must have a unique subject");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo test -p event-payload`
|
||||||
|
Expected: 2 tests pass.
|
||||||
|
|
||||||
|
- [ ] **Commit:**
|
||||||
|
```bash
|
||||||
|
git add Cargo.toml crates/adapters/event-payload/
|
||||||
|
git commit -m "feat(event-payload): serializable NATS event payload types"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 2: nats crate — NatsEventPublisher + NatsEventConsumer
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `crates/adapters/nats/Cargo.toml`
|
||||||
|
- Modify: `crates/adapters/nats/src/lib.rs`
|
||||||
|
|
||||||
|
- [ ] **Write `crates/adapters/nats/Cargo.toml`:**
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[package]
|
||||||
|
name = "nats"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
domain = { workspace = true }
|
||||||
|
event-payload = { workspace = true }
|
||||||
|
async-nats = { workspace = true }
|
||||||
|
async-stream = { workspace = true }
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
futures = { workspace = true }
|
||||||
|
tokio = { workspace = true }
|
||||||
|
async-trait = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Write test** at bottom of `crates/adapters/nats/src/lib.rs`:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use domain::value_objects::{ThoughtId, UserId};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn payload_from_domain_event_has_correct_subject() {
|
||||||
|
let event = domain::events::DomainEvent::ThoughtCreated {
|
||||||
|
thought_id: ThoughtId::new(),
|
||||||
|
user_id: UserId::new(),
|
||||||
|
in_reply_to_id: None,
|
||||||
|
};
|
||||||
|
let payload = EventPayload::from(&event);
|
||||||
|
assert_eq!(payload.subject(), "thoughts.created");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn domain_event_roundtrip_via_payload() {
|
||||||
|
let uid = UserId::new();
|
||||||
|
let tid = ThoughtId::new();
|
||||||
|
let event = domain::events::DomainEvent::LikeAdded {
|
||||||
|
like_id: domain::value_objects::LikeId::new(),
|
||||||
|
user_id: uid.clone(),
|
||||||
|
thought_id: tid.clone(),
|
||||||
|
};
|
||||||
|
let payload = EventPayload::from(&event);
|
||||||
|
let back = domain::events::DomainEvent::try_from(payload).unwrap();
|
||||||
|
if let domain::events::DomainEvent::LikeAdded { user_id, thought_id, .. } = back {
|
||||||
|
assert_eq!(user_id, uid);
|
||||||
|
assert_eq!(thought_id, tid);
|
||||||
|
} else {
|
||||||
|
panic!("wrong variant");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo test -p nats` — Expected: FAIL (lib.rs is empty).
|
||||||
|
|
||||||
|
- [ ] **Write `crates/adapters/nats/src/lib.rs`:**
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use domain::{
|
||||||
|
errors::DomainError,
|
||||||
|
events::{DomainEvent, EventEnvelope},
|
||||||
|
ports::{EventConsumer, EventPublisher},
|
||||||
|
value_objects::{BoostId, LikeId, ThoughtId, UserId},
|
||||||
|
};
|
||||||
|
use event_payload::EventPayload;
|
||||||
|
use futures::stream::BoxStream;
|
||||||
|
|
||||||
|
// ── DomainEvent → EventPayload ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
impl From<&DomainEvent> for EventPayload {
|
||||||
|
fn from(e: &DomainEvent) -> Self {
|
||||||
|
match e {
|
||||||
|
DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => Self::ThoughtCreated {
|
||||||
|
thought_id: thought_id.to_string(),
|
||||||
|
user_id: user_id.to_string(),
|
||||||
|
in_reply_to_id: in_reply_to_id.as_ref().map(|x| x.to_string()),
|
||||||
|
},
|
||||||
|
DomainEvent::ThoughtDeleted { thought_id, user_id } => Self::ThoughtDeleted {
|
||||||
|
thought_id: thought_id.to_string(), user_id: user_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::ThoughtUpdated { thought_id, user_id } => Self::ThoughtUpdated {
|
||||||
|
thought_id: thought_id.to_string(), user_id: user_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::LikeAdded { like_id, user_id, thought_id } => Self::LikeAdded {
|
||||||
|
like_id: like_id.to_string(), user_id: user_id.to_string(), thought_id: thought_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::LikeRemoved { user_id, thought_id } => Self::LikeRemoved {
|
||||||
|
user_id: user_id.to_string(), thought_id: thought_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::BoostAdded { boost_id, user_id, thought_id } => Self::BoostAdded {
|
||||||
|
boost_id: boost_id.to_string(), user_id: user_id.to_string(), thought_id: thought_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::BoostRemoved { user_id, thought_id } => Self::BoostRemoved {
|
||||||
|
user_id: user_id.to_string(), thought_id: thought_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::FollowRequested { follower_id, following_id } => Self::FollowRequested {
|
||||||
|
follower_id: follower_id.to_string(), following_id: following_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::FollowAccepted { follower_id, following_id } => Self::FollowAccepted {
|
||||||
|
follower_id: follower_id.to_string(), following_id: following_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::FollowRejected { follower_id, following_id } => Self::FollowRejected {
|
||||||
|
follower_id: follower_id.to_string(), following_id: following_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::Unfollowed { follower_id, following_id } => Self::Unfollowed {
|
||||||
|
follower_id: follower_id.to_string(), following_id: following_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::UserBlocked { blocker_id, blocked_id } => Self::UserBlocked {
|
||||||
|
blocker_id: blocker_id.to_string(), blocked_id: blocked_id.to_string(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── EventPayload → DomainEvent ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
fn parse_uuid(s: &str, field: &str) -> Result<uuid::Uuid, DomainError> {
|
||||||
|
uuid::Uuid::parse_str(s)
|
||||||
|
.map_err(|_| DomainError::Internal(format!("invalid uuid for {field}: {s}")))
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<EventPayload> for DomainEvent {
|
||||||
|
type Error = DomainError;
|
||||||
|
|
||||||
|
fn try_from(p: EventPayload) -> Result<Self, DomainError> {
|
||||||
|
Ok(match p {
|
||||||
|
EventPayload::ThoughtCreated { thought_id, user_id, in_reply_to_id } => DomainEvent::ThoughtCreated {
|
||||||
|
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
in_reply_to_id: in_reply_to_id
|
||||||
|
.map(|s| parse_uuid(&s, "in_reply_to_id").map(ThoughtId::from_uuid))
|
||||||
|
.transpose()?,
|
||||||
|
},
|
||||||
|
EventPayload::ThoughtDeleted { thought_id, user_id } => DomainEvent::ThoughtDeleted {
|
||||||
|
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::ThoughtUpdated { thought_id, user_id } => DomainEvent::ThoughtUpdated {
|
||||||
|
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::LikeAdded { like_id, user_id, thought_id } => DomainEvent::LikeAdded {
|
||||||
|
like_id: LikeId::from_uuid(parse_uuid(&like_id, "like_id")?),
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::LikeRemoved { user_id, thought_id } => DomainEvent::LikeRemoved {
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::BoostAdded { boost_id, user_id, thought_id } => DomainEvent::BoostAdded {
|
||||||
|
boost_id: BoostId::from_uuid(parse_uuid(&boost_id, "boost_id")?),
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::BoostRemoved { user_id, thought_id } => DomainEvent::BoostRemoved {
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::FollowRequested { follower_id, following_id } => DomainEvent::FollowRequested {
|
||||||
|
follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
|
||||||
|
following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::FollowAccepted { follower_id, following_id } => DomainEvent::FollowAccepted {
|
||||||
|
follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
|
||||||
|
following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::FollowRejected { follower_id, following_id } => DomainEvent::FollowRejected {
|
||||||
|
follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
|
||||||
|
following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::Unfollowed { follower_id, following_id } => DomainEvent::Unfollowed {
|
||||||
|
follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
|
||||||
|
following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::UserBlocked { blocker_id, blocked_id } => DomainEvent::UserBlocked {
|
||||||
|
blocker_id: UserId::from_uuid(parse_uuid(&blocker_id, "blocker_id")?),
|
||||||
|
blocked_id: UserId::from_uuid(parse_uuid(&blocked_id, "blocked_id")?),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── NatsEventPublisher ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
pub struct NatsEventPublisher {
|
||||||
|
client: async_nats::Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NatsEventPublisher {
|
||||||
|
pub fn new(client: async_nats::Client) -> Self { Self { client } }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl EventPublisher for NatsEventPublisher {
|
||||||
|
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||||
|
let payload = EventPayload::from(event);
|
||||||
|
let subject = payload.subject();
|
||||||
|
let bytes = serde_json::to_vec(&payload)
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||||
|
self.client
|
||||||
|
.publish(subject, bytes.into())
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── NatsEventConsumer ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
pub struct NatsEventConsumer {
|
||||||
|
client: async_nats::Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NatsEventConsumer {
|
||||||
|
pub fn new(client: async_nats::Client) -> Self { Self { client } }
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventConsumer for NatsEventConsumer {
|
||||||
|
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||||
|
let client = self.client.clone();
|
||||||
|
Box::pin(async_stream::try_stream! {
|
||||||
|
let mut sub = client
|
||||||
|
.subscribe(">")
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||||
|
|
||||||
|
use futures::StreamExt;
|
||||||
|
while let Some(msg) = sub.next().await {
|
||||||
|
let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("failed to deserialize event payload: {e}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let event = match DomainEvent::try_from(payload) {
|
||||||
|
Ok(e) => e,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("failed to convert payload to domain event: {e}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Basic NATS has no ack/nack — at-most-once delivery
|
||||||
|
yield EventEnvelope {
|
||||||
|
event,
|
||||||
|
ack: Box::new(|| {}),
|
||||||
|
nack: Box::new(|| {}),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo test -p nats`
|
||||||
|
Expected: 2 tests pass.
|
||||||
|
|
||||||
|
- [ ] **Commit:**
|
||||||
|
```bash
|
||||||
|
git add crates/adapters/nats/
|
||||||
|
git commit -m "feat(nats): NatsEventPublisher and NatsEventConsumer with payload conversion"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 3: worker — NotificationHandler + FederationHandler
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `crates/worker/Cargo.toml`
|
||||||
|
- Create: `crates/worker/src/handlers.rs`
|
||||||
|
|
||||||
|
- [ ] **Write `crates/worker/Cargo.toml`:**
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[package]
|
||||||
|
name = "worker"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[[bin]]
|
||||||
|
name = "thoughts-worker"
|
||||||
|
path = "src/main.rs"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
domain = { workspace = true }
|
||||||
|
nats = { workspace = true }
|
||||||
|
event-payload = { workspace = true }
|
||||||
|
postgres = { workspace = true }
|
||||||
|
async-nats = { workspace = true }
|
||||||
|
tokio = { workspace = true, features = ["full"] }
|
||||||
|
futures = { workspace = true }
|
||||||
|
async-trait = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
tracing-subscriber = { workspace = true }
|
||||||
|
dotenvy = { workspace = true }
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
chrono = { workspace = true }
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Write tests** at bottom of `crates/worker/src/handlers.rs`:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use domain::{
|
||||||
|
models::{thought::{Thought, Visibility}, user::User},
|
||||||
|
testing::TestStore,
|
||||||
|
value_objects::*,
|
||||||
|
};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
fn alice() -> User {
|
||||||
|
User::new_local(
|
||||||
|
UserId::new(),
|
||||||
|
Username::new("alice").unwrap(),
|
||||||
|
Email::new("alice@ex.com").unwrap(),
|
||||||
|
PasswordHash("h".into()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn like_added_creates_notification_for_thought_author() {
|
||||||
|
let store = TestStore::default();
|
||||||
|
let alice = alice();
|
||||||
|
let bob_id = UserId::new();
|
||||||
|
|
||||||
|
// alice posts a thought
|
||||||
|
let thought = Thought::new_local(
|
||||||
|
ThoughtId::new(), alice.id.clone(),
|
||||||
|
Content::new_local("hello").unwrap(),
|
||||||
|
None, Visibility::Public, None, false,
|
||||||
|
);
|
||||||
|
store.users.lock().unwrap().push(alice.clone());
|
||||||
|
store.thoughts.lock().unwrap().push(thought.clone());
|
||||||
|
|
||||||
|
let handler = NotificationHandler {
|
||||||
|
thoughts: Arc::new(store.clone()),
|
||||||
|
notifications: Arc::new(store.clone()),
|
||||||
|
};
|
||||||
|
|
||||||
|
// bob likes alice's thought
|
||||||
|
handler.handle(&DomainEvent::LikeAdded {
|
||||||
|
like_id: LikeId::new(),
|
||||||
|
user_id: bob_id.clone(),
|
||||||
|
thought_id: thought.id.clone(),
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
let notifs = store.notifications.lock().unwrap();
|
||||||
|
assert_eq!(notifs.len(), 1);
|
||||||
|
assert_eq!(notifs[0].user_id, alice.id); // notification goes to alice
|
||||||
|
assert!(matches!(notifs[0].notification_type, domain::models::notification::NotificationType::Like));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn self_like_does_not_create_notification() {
|
||||||
|
let store = TestStore::default();
|
||||||
|
let alice = alice();
|
||||||
|
let thought = Thought::new_local(
|
||||||
|
ThoughtId::new(), alice.id.clone(),
|
||||||
|
Content::new_local("hello").unwrap(),
|
||||||
|
None, Visibility::Public, None, false,
|
||||||
|
);
|
||||||
|
store.users.lock().unwrap().push(alice.clone());
|
||||||
|
store.thoughts.lock().unwrap().push(thought.clone());
|
||||||
|
|
||||||
|
let handler = NotificationHandler {
|
||||||
|
thoughts: Arc::new(store.clone()),
|
||||||
|
notifications: Arc::new(store.clone()),
|
||||||
|
};
|
||||||
|
|
||||||
|
handler.handle(&DomainEvent::LikeAdded {
|
||||||
|
like_id: LikeId::new(),
|
||||||
|
user_id: alice.id.clone(), // alice likes her own thought
|
||||||
|
thought_id: thought.id.clone(),
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
assert!(store.notifications.lock().unwrap().is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn follow_accepted_creates_notification() {
|
||||||
|
let store = TestStore::default();
|
||||||
|
let alice = alice();
|
||||||
|
let bob_id = UserId::new();
|
||||||
|
store.users.lock().unwrap().push(alice.clone());
|
||||||
|
|
||||||
|
let handler = NotificationHandler {
|
||||||
|
thoughts: Arc::new(store.clone()),
|
||||||
|
notifications: Arc::new(store.clone()),
|
||||||
|
};
|
||||||
|
|
||||||
|
// bob follows alice (alice gets notified)
|
||||||
|
handler.handle(&DomainEvent::FollowAccepted {
|
||||||
|
follower_id: bob_id.clone(),
|
||||||
|
following_id: alice.id.clone(),
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
let notifs = store.notifications.lock().unwrap();
|
||||||
|
assert_eq!(notifs.len(), 1);
|
||||||
|
assert_eq!(notifs[0].user_id, alice.id);
|
||||||
|
assert!(matches!(notifs[0].notification_type, domain::models::notification::NotificationType::Follow));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo test -p worker` — Expected: FAIL (handlers.rs doesn't exist yet).
|
||||||
|
|
||||||
|
- [ ] **Create `crates/worker/src/handlers.rs`:**
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use std::sync::Arc;
|
||||||
|
use chrono::Utc;
|
||||||
|
use domain::{
|
||||||
|
errors::DomainError,
|
||||||
|
events::DomainEvent,
|
||||||
|
models::notification::{Notification, NotificationType},
|
||||||
|
ports::{NotificationRepository, ThoughtRepository},
|
||||||
|
value_objects::NotificationId,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Handles domain events that should create notifications for users.
|
||||||
|
pub struct NotificationHandler {
|
||||||
|
pub thoughts: Arc<dyn ThoughtRepository>,
|
||||||
|
pub notifications: Arc<dyn NotificationRepository>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NotificationHandler {
|
||||||
|
pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||||
|
match event {
|
||||||
|
DomainEvent::LikeAdded { like_id: _, user_id, thought_id } => {
|
||||||
|
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||||
|
Some(t) => t,
|
||||||
|
None => return Ok(()), // thought deleted — skip
|
||||||
|
};
|
||||||
|
if thought.user_id == *user_id { return Ok(()); } // no self-notifications
|
||||||
|
self.notifications.save(&Notification {
|
||||||
|
id: NotificationId::new(),
|
||||||
|
user_id: thought.user_id,
|
||||||
|
notification_type: NotificationType::Like,
|
||||||
|
from_user_id: Some(user_id.clone()),
|
||||||
|
thought_id: Some(thought_id.clone()),
|
||||||
|
read: false,
|
||||||
|
created_at: Utc::now(),
|
||||||
|
}).await
|
||||||
|
}
|
||||||
|
DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => {
|
||||||
|
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||||
|
Some(t) => t,
|
||||||
|
None => return Ok(()),
|
||||||
|
};
|
||||||
|
if thought.user_id == *user_id { return Ok(()); }
|
||||||
|
self.notifications.save(&Notification {
|
||||||
|
id: NotificationId::new(),
|
||||||
|
user_id: thought.user_id,
|
||||||
|
notification_type: NotificationType::Boost,
|
||||||
|
from_user_id: Some(user_id.clone()),
|
||||||
|
thought_id: Some(thought_id.clone()),
|
||||||
|
read: false,
|
||||||
|
created_at: Utc::now(),
|
||||||
|
}).await
|
||||||
|
}
|
||||||
|
DomainEvent::FollowAccepted { follower_id, following_id } => {
|
||||||
|
// The person being followed (following_id) gets notified
|
||||||
|
self.notifications.save(&Notification {
|
||||||
|
id: NotificationId::new(),
|
||||||
|
user_id: following_id.clone(),
|
||||||
|
notification_type: NotificationType::Follow,
|
||||||
|
from_user_id: Some(follower_id.clone()),
|
||||||
|
thought_id: None,
|
||||||
|
read: false,
|
||||||
|
created_at: Utc::now(),
|
||||||
|
}).await
|
||||||
|
}
|
||||||
|
// All other events: no notification needed in Plan 3
|
||||||
|
_ => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stub handler for ActivityPub federation — implemented in Plan 4.
|
||||||
|
pub struct FederationHandler;
|
||||||
|
|
||||||
|
impl FederationHandler {
|
||||||
|
pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||||
|
tracing::debug!(event = ?event, "federation handler (stub — Plan 4)");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo test -p worker` — Expected: 3 tests pass.
|
||||||
|
|
||||||
|
- [ ] **Commit:**
|
||||||
|
```bash
|
||||||
|
git add crates/worker/
|
||||||
|
git commit -m "feat(worker): NotificationHandler and FederationHandler stub"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 4: worker main binary
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `crates/worker/src/main.rs`
|
||||||
|
|
||||||
|
- [ ] **Write `crates/worker/src/main.rs`:**
|
||||||
|
|
||||||
|
```rust
|
||||||
|
mod handlers;
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use sqlx::PgPool;
|
||||||
|
use domain::ports::EventConsumer;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
dotenvy::dotenv().ok();
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||||
|
.init();
|
||||||
|
|
||||||
|
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required");
|
||||||
|
let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());
|
||||||
|
|
||||||
|
tracing::info!("Connecting to postgres...");
|
||||||
|
let pool = PgPool::connect(&database_url).await.expect("DB connect failed");
|
||||||
|
|
||||||
|
tracing::info!("Connecting to NATS at {nats_url}...");
|
||||||
|
let nats_client = async_nats::connect(&nats_url).await.expect("NATS connect failed");
|
||||||
|
let consumer = nats::NatsEventConsumer::new(nats_client);
|
||||||
|
|
||||||
|
let notification_handler = handlers::NotificationHandler {
|
||||||
|
thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())),
|
||||||
|
notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())),
|
||||||
|
};
|
||||||
|
let federation_handler = handlers::FederationHandler;
|
||||||
|
|
||||||
|
tracing::info!("Worker started, consuming events...");
|
||||||
|
|
||||||
|
let mut stream = consumer.consume();
|
||||||
|
while let Some(result) = stream.next().await {
|
||||||
|
match result {
|
||||||
|
Ok(envelope) => {
|
||||||
|
let event = &envelope.event;
|
||||||
|
tracing::debug!(subject = ?event, "received event");
|
||||||
|
|
||||||
|
let n_result = notification_handler.handle(event).await;
|
||||||
|
let f_result = federation_handler.handle(event).await;
|
||||||
|
|
||||||
|
if n_result.is_ok() && f_result.is_ok() {
|
||||||
|
(envelope.ack)();
|
||||||
|
} else {
|
||||||
|
if let Err(e) = n_result { tracing::error!("notification handler error: {e}"); }
|
||||||
|
if let Err(e) = f_result { tracing::error!("federation handler error: {e}"); }
|
||||||
|
(envelope.nack)();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("consumer error: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo build -p worker`
|
||||||
|
Expected: compiles cleanly (binary `thoughts-worker` produced).
|
||||||
|
|
||||||
|
- [ ] **Smoke test** (requires NATS running):
|
||||||
|
```bash
|
||||||
|
# Terminal 1: start NATS if not already running
|
||||||
|
docker run -d --name nats -p 4222:4222 nats:latest || true
|
||||||
|
|
||||||
|
# Terminal 2: start worker
|
||||||
|
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres \
|
||||||
|
RUST_LOG=info \
|
||||||
|
cargo run --bin thoughts-worker &
|
||||||
|
sleep 2
|
||||||
|
|
||||||
|
# Terminal 3: start API server
|
||||||
|
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres JWT_SECRET=dev cargo run -p presentation &
|
||||||
|
sleep 2
|
||||||
|
|
||||||
|
# Create a user, post a thought, like it — check that worker logs "received event"
|
||||||
|
TOKEN=$(curl -s -X POST http://localhost:3000/auth/register \
|
||||||
|
-H 'content-type: application/json' \
|
||||||
|
-d '{"username":"evttest","email":"evt@test.com","password":"pw"}' | jq -r .token)
|
||||||
|
|
||||||
|
TID=$(curl -s -X POST http://localhost:3000/thoughts \
|
||||||
|
-H 'content-type: application/json' \
|
||||||
|
-H "Authorization: Bearer $TOKEN" \
|
||||||
|
-d '{"content":"event test"}' | jq -r .id)
|
||||||
|
|
||||||
|
curl -s -X POST http://localhost:3000/thoughts/$TID/like \
|
||||||
|
-H "Authorization: Bearer $TOKEN"
|
||||||
|
|
||||||
|
kill %1 %2 2>/dev/null
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected: worker logs show `received event` for the like. No errors.
|
||||||
|
|
||||||
|
- [ ] **Commit:**
|
||||||
|
```bash
|
||||||
|
git add crates/worker/src/main.rs
|
||||||
|
git commit -m "feat(worker): consumer loop binary connecting NATS to handlers"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 5: Presentation — swap NoOp for real NatsEventPublisher
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `crates/presentation/Cargo.toml`
|
||||||
|
- Modify: `crates/presentation/src/lib.rs`
|
||||||
|
- Modify: `crates/presentation/src/main.rs`
|
||||||
|
|
||||||
|
When NATS_URL is not set, fall back to the `NoOpEventPublisher` so the API still starts without NATS. Use an env var `NATS_URL` — if set, use real publisher; if absent, log a warning and use no-op.
|
||||||
|
|
||||||
|
- [ ] **Add `nats` to `crates/presentation/Cargo.toml` deps:**
|
||||||
|
|
||||||
|
```toml
|
||||||
|
nats = { workspace = true }
|
||||||
|
async-nats = { workspace = true }
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Update `crates/presentation/src/lib.rs`** — replace the `NoOpEventPublisher` struct and `build_state` function with one that optionally connects to NATS:
|
||||||
|
|
||||||
|
Replace the existing `build_state` signature with an async version:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use std::sync::Arc;
|
||||||
|
use sqlx::PgPool;
|
||||||
|
use state::AppState;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
|
||||||
|
|
||||||
|
pub mod errors;
|
||||||
|
pub mod extractors;
|
||||||
|
pub mod handlers;
|
||||||
|
pub mod routes;
|
||||||
|
pub mod state;
|
||||||
|
|
||||||
|
use postgres_search::PgSearchRepository;
|
||||||
|
|
||||||
|
struct NoOpEventPublisher;
|
||||||
|
#[async_trait]
|
||||||
|
impl EventPublisher for NoOpEventPublisher {
|
||||||
|
async fn publish(&self, _e: &DomainEvent) -> Result<(), DomainError> { Ok(()) }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn build_state(pool: PgPool, jwt_secret: String) -> AppState {
|
||||||
|
let event_publisher: Arc<dyn EventPublisher> = match std::env::var("NATS_URL") {
|
||||||
|
Ok(url) => {
|
||||||
|
match async_nats::connect(&url).await {
|
||||||
|
Ok(client) => {
|
||||||
|
tracing::info!("Connected to NATS at {url}");
|
||||||
|
Arc::new(nats::NatsEventPublisher::new(client))
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("Failed to connect to NATS at {url}: {e} — using no-op publisher");
|
||||||
|
Arc::new(NoOpEventPublisher)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
tracing::info!("NATS_URL not set — using no-op event publisher");
|
||||||
|
Arc::new(NoOpEventPublisher)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
AppState {
|
||||||
|
users: Arc::new(postgres::user::PgUserRepository::new(pool.clone())),
|
||||||
|
thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())),
|
||||||
|
likes: Arc::new(postgres::like::PgLikeRepository::new(pool.clone())),
|
||||||
|
boosts: Arc::new(postgres::boost::PgBoostRepository::new(pool.clone())),
|
||||||
|
follows: Arc::new(postgres::follow::PgFollowRepository::new(pool.clone())),
|
||||||
|
blocks: Arc::new(postgres::block::PgBlockRepository::new(pool.clone())),
|
||||||
|
tags: Arc::new(postgres::tag::PgTagRepository::new(pool.clone())),
|
||||||
|
api_keys: Arc::new(postgres::api_key::PgApiKeyRepository::new(pool.clone())),
|
||||||
|
top_friends: Arc::new(postgres::top_friend::PgTopFriendRepository::new(pool.clone())),
|
||||||
|
notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())),
|
||||||
|
remote_actors: Arc::new(postgres::remote_actor::PgRemoteActorRepository::new(pool.clone())),
|
||||||
|
feed: Arc::new(postgres::feed::PgFeedRepository::new(pool.clone())),
|
||||||
|
search: Arc::new(PgSearchRepository::new(pool.clone())),
|
||||||
|
auth: Arc::new(auth::JwtAuthService::new(jwt_secret, 86400 * 30)),
|
||||||
|
hasher: Arc::new(auth::Argon2PasswordHasher),
|
||||||
|
events: event_publisher,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Update `crates/presentation/src/main.rs`** — `build_state` is now async, so await it:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use sqlx::PgPool;
|
||||||
|
use tower_http::cors::CorsLayer;
|
||||||
|
use tracing_subscriber::EnvFilter;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
dotenvy::dotenv().ok();
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_env_filter(EnvFilter::from_default_env())
|
||||||
|
.init();
|
||||||
|
|
||||||
|
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required");
|
||||||
|
let jwt_secret = std::env::var("JWT_SECRET").expect("JWT_SECRET required");
|
||||||
|
let port = std::env::var("PORT").unwrap_or_else(|_| "3000".into());
|
||||||
|
|
||||||
|
let pool = PgPool::connect(&database_url).await.expect("DB connect failed");
|
||||||
|
sqlx::migrate!("../adapters/postgres/migrations").run(&pool).await.expect("Migrations failed");
|
||||||
|
|
||||||
|
let state = presentation::build_state(pool, jwt_secret).await; // note: .await
|
||||||
|
let app = presentation::routes::router()
|
||||||
|
.with_state(state)
|
||||||
|
.layer(CorsLayer::permissive());
|
||||||
|
|
||||||
|
let addr = format!("0.0.0.0:{port}");
|
||||||
|
tracing::info!("Listening on {addr}");
|
||||||
|
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
|
||||||
|
axum::serve(listener, app).await.unwrap();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo build -p presentation`
|
||||||
|
Expected: clean build.
|
||||||
|
|
||||||
|
- [ ] **Verify no-op fallback works** (without NATS running):
|
||||||
|
```bash
|
||||||
|
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres JWT_SECRET=dev \
|
||||||
|
RUST_LOG=info cargo run -p presentation &
|
||||||
|
sleep 2
|
||||||
|
# Should log: "NATS_URL not set — using no-op event publisher"
|
||||||
|
curl -s -X POST http://localhost:3000/auth/register \
|
||||||
|
-H 'content-type: application/json' \
|
||||||
|
-d '{"username":"natstest","email":"nats@test.com","password":"pw"}' | jq .token
|
||||||
|
kill %1
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run full test suite:**
|
||||||
|
```bash
|
||||||
|
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace
|
||||||
|
```
|
||||||
|
Expected: all tests pass (52 + new worker tests = 55+).
|
||||||
|
|
||||||
|
- [ ] **Commit:**
|
||||||
|
```bash
|
||||||
|
git add crates/presentation/
|
||||||
|
git commit -m "feat(presentation): NatsEventPublisher with no-op fallback when NATS_URL unset"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Self-Review
|
||||||
|
|
||||||
|
**Spec coverage:**
|
||||||
|
- ✅ event-payload: serializable EventPayload enum, subject(), From/TryFrom conversions (Task 1)
|
||||||
|
- ✅ nats: NatsEventPublisher implementing EventPublisher (Task 2)
|
||||||
|
- ✅ nats: NatsEventConsumer implementing EventConsumer via BoxStream (Task 2)
|
||||||
|
- ✅ worker: NotificationHandler (LikeAdded, BoostAdded, FollowAccepted → notifications) (Task 3)
|
||||||
|
- ✅ worker: FederationHandler stub (Task 3)
|
||||||
|
- ✅ worker: consumer loop binary (Task 4)
|
||||||
|
- ✅ presentation: real NATS publisher with graceful no-op fallback (Task 5)
|
||||||
|
- ✅ event-publisher: stays as stub (correct — deferred per plan)
|
||||||
|
|
||||||
|
**Placeholder scan:** None — all code blocks complete.
|
||||||
|
|
||||||
|
**Type consistency:**
|
||||||
|
- `NatsEventPublisher::new(client: async_nats::Client)` — matches usage in presentation lib.rs and worker main.rs
|
||||||
|
- `NatsEventConsumer::new(client: async_nats::Client)` — matches worker main.rs
|
||||||
|
- `NotificationHandler { thoughts, notifications }` — field names match handler usage in main.rs
|
||||||
|
- `build_state` is now `async fn` — main.rs correctly awaits it
|
||||||
|
- `EventPayload::from(&DomainEvent)` — implemented in nats crate (which sees both types)
|
||||||
|
|
||||||
|
**Notes:**
|
||||||
|
- Basic NATS (at-most-once delivery) is used — JetStream (exactly-once) deferred to later
|
||||||
|
- Worker Cargo.toml includes `postgres` internal crate for database access in handlers
|
||||||
|
- `crates/adapters/nats` has Rust module name `nats` but package name `nats` — import as `use nats::...`
|
||||||
Reference in New Issue
Block a user