Compare commits

...

3 Commits

7 changed files with 518 additions and 3 deletions

View File

@@ -61,6 +61,13 @@ pub enum EventPayload {
blocker_id: String, blocker_id: String,
blocked_id: String, blocked_id: String,
}, },
UserUnblocked {
blocker_id: String,
blocked_id: String,
},
UserRegistered {
user_id: String,
},
} }
impl EventPayload { impl EventPayload {
@@ -79,6 +86,8 @@ impl EventPayload {
Self::FollowRejected { .. } => "follows.rejected", Self::FollowRejected { .. } => "follows.rejected",
Self::Unfollowed { .. } => "follows.removed", Self::Unfollowed { .. } => "follows.removed",
Self::UserBlocked { .. } => "users.blocked", Self::UserBlocked { .. } => "users.blocked",
Self::UserUnblocked { .. } => "users.unblocked",
Self::UserRegistered { .. } => "users.registered",
} }
} }
} }
@@ -126,6 +135,12 @@ impl From<&DomainEvent> for EventPayload {
DomainEvent::UserBlocked { blocker_id, blocked_id } => Self::UserBlocked { DomainEvent::UserBlocked { blocker_id, blocked_id } => Self::UserBlocked {
blocker_id: blocker_id.to_string(), blocked_id: blocked_id.to_string(), blocker_id: blocker_id.to_string(), blocked_id: blocked_id.to_string(),
}, },
DomainEvent::UserUnblocked { blocker_id, blocked_id } => Self::UserUnblocked {
blocker_id: blocker_id.to_string(), blocked_id: blocked_id.to_string(),
},
DomainEvent::UserRegistered { user_id } => Self::UserRegistered {
user_id: user_id.to_string(),
},
} }
} }
} }
@@ -195,6 +210,13 @@ impl TryFrom<EventPayload> for DomainEvent {
blocker_id: UserId::from_uuid(parse_uuid(&blocker_id, "blocker_id")?), blocker_id: UserId::from_uuid(parse_uuid(&blocker_id, "blocker_id")?),
blocked_id: UserId::from_uuid(parse_uuid(&blocked_id, "blocked_id")?), blocked_id: UserId::from_uuid(parse_uuid(&blocked_id, "blocked_id")?),
}, },
EventPayload::UserUnblocked { blocker_id, blocked_id } => DomainEvent::UserUnblocked {
blocker_id: UserId::from_uuid(parse_uuid(&blocker_id, "blocker_id")?),
blocked_id: UserId::from_uuid(parse_uuid(&blocked_id, "blocked_id")?),
},
EventPayload::UserRegistered { user_id } => DomainEvent::UserRegistered {
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
},
}) })
} }
} }

View File

@@ -1,5 +1,6 @@
use domain::{ use domain::{
errors::DomainError, errors::DomainError,
events::DomainEvent,
models::user::User, models::user::User,
ports::{AuthService, EventPublisher, PasswordHasher, UserRepository}, ports::{AuthService, EventPublisher, PasswordHasher, UserRepository},
value_objects::{Email, UserId, Username}, value_objects::{Email, UserId, Username},
@@ -13,7 +14,7 @@ pub async fn register(
users: &dyn UserRepository, users: &dyn UserRepository,
hasher: &dyn PasswordHasher, hasher: &dyn PasswordHasher,
auth: &dyn AuthService, auth: &dyn AuthService,
_events: &dyn EventPublisher, events: &dyn EventPublisher,
input: RegisterInput, input: RegisterInput,
) -> Result<RegisterOutput, DomainError> { ) -> Result<RegisterOutput, DomainError> {
let username = Username::new(input.username)?; let username = Username::new(input.username)?;
@@ -27,6 +28,7 @@ pub async fn register(
let hash = hasher.hash(&input.password).await?; let hash = hasher.hash(&input.password).await?;
let user = User::new_local(UserId::new(), username, email, hash); let user = User::new_local(UserId::new(), username, email, hash);
users.save(&user).await?; users.save(&user).await?;
events.publish(&DomainEvent::UserRegistered { user_id: user.id.clone() }).await?;
let token = auth.generate_token(&user.id)?; let token = auth.generate_token(&user.id)?;
Ok(RegisterOutput { user, token: token.token }) Ok(RegisterOutput { user, token: token.token })
} }
@@ -56,6 +58,7 @@ mod tests {
use async_trait::async_trait; use async_trait::async_trait;
use domain::{ use domain::{
errors::DomainError, errors::DomainError,
events::DomainEvent,
ports::{AuthService, GeneratedToken, PasswordHasher}, ports::{AuthService, GeneratedToken, PasswordHasher},
testing::{NoOpEventPublisher, TestStore}, testing::{NoOpEventPublisher, TestStore},
value_objects::{PasswordHash, UserId}, value_objects::{PasswordHash, UserId},
@@ -112,4 +115,13 @@ mod tests {
let err = login(&store, &FakeHasher, &FakeAuth, LoginInput { email: "alice@ex.com".into(), password: "wrong".into() }).await.unwrap_err(); let err = login(&store, &FakeHasher, &FakeAuth, LoginInput { email: "alice@ex.com".into(), password: "wrong".into() }).await.unwrap_err();
assert!(matches!(err, DomainError::Unauthorized)); assert!(matches!(err, DomainError::Unauthorized));
} }
#[tokio::test]
async fn register_publishes_user_registered_event() {
let store = TestStore::default();
register(&store, &FakeHasher, &FakeAuth, &store, input()).await.unwrap();
let events = store.events.lock().unwrap();
assert_eq!(events.len(), 1);
assert!(matches!(events[0], DomainEvent::UserRegistered { .. }));
}
} }

View File

@@ -67,8 +67,17 @@ pub async fn block_user(blocks: &dyn BlockRepository, events: &dyn EventPublishe
Ok(()) Ok(())
} }
pub async fn unblock_user(blocks: &dyn BlockRepository, blocker_id: &UserId, blocked_id: &UserId) -> Result<(), DomainError> { pub async fn unblock_user(
blocks: &dyn BlockRepository,
events: &dyn EventPublisher,
blocker_id: &UserId,
blocked_id: &UserId,
) -> Result<(), DomainError> {
blocks.delete(blocker_id, blocked_id).await?; blocks.delete(blocker_id, blocked_id).await?;
events.publish(&DomainEvent::UserUnblocked {
blocker_id: blocker_id.clone(),
blocked_id: blocked_id.clone(),
}).await?;
Ok(()) Ok(())
} }
@@ -114,4 +123,17 @@ mod tests {
let err = follow_user(&store, &store, &alice.id, &alice.id).await.unwrap_err(); let err = follow_user(&store, &store, &alice.id, &alice.id).await.unwrap_err();
assert!(matches!(err, DomainError::InvalidInput(_))); assert!(matches!(err, DomainError::InvalidInput(_)));
} }
#[tokio::test]
async fn unblock_user_publishes_event() {
let store = TestStore::default();
let alice = user("alice");
let bob = user("bob");
block_user(&store, &store, &alice.id, &bob.id).await.unwrap();
store.events.lock().unwrap().clear();
unblock_user(&store, &store, &alice.id, &bob.id).await.unwrap();
let events = store.events.lock().unwrap();
assert_eq!(events.len(), 1);
assert!(matches!(events[0], DomainEvent::UserUnblocked { .. }));
}
} }

View File

@@ -14,6 +14,8 @@ pub enum DomainEvent {
FollowRejected { follower_id: UserId, following_id: UserId }, FollowRejected { follower_id: UserId, following_id: UserId },
Unfollowed { follower_id: UserId, following_id: UserId }, Unfollowed { follower_id: UserId, following_id: UserId },
UserBlocked { blocker_id: UserId, blocked_id: UserId }, UserBlocked { blocker_id: UserId, blocked_id: UserId },
UserUnblocked { blocker_id: UserId, blocked_id: UserId },
UserRegistered { user_id: UserId },
} }
pub struct EventEnvelope { pub struct EventEnvelope {

View File

@@ -35,7 +35,7 @@ pub async fn post_block(State(s): State<AppState>, AuthUser(uid): AuthUser, Path
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }
pub async fn delete_block(State(s): State<AppState>, AuthUser(uid): AuthUser, Path(target): Path<Uuid>) -> Result<StatusCode, ApiError> { pub async fn delete_block(State(s): State<AppState>, AuthUser(uid): AuthUser, Path(target): Path<Uuid>) -> Result<StatusCode, ApiError> {
unblock_user(&*s.blocks, &uid, &UserId::from_uuid(target)).await?; unblock_user(&*s.blocks, &*s.events, &uid, &UserId::from_uuid(target)).await?;
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }
pub async fn put_top_friends(State(s): State<AppState>, AuthUser(uid): AuthUser, Json(body): Json<SetTopFriendsRequest>) -> Result<StatusCode, ApiError> { pub async fn put_top_friends(State(s): State<AppState>, AuthUser(uid): AuthUser, Json(body): Json<SetTopFriendsRequest>) -> Result<StatusCode, ApiError> {

View File

@@ -61,6 +61,26 @@ impl NotificationHandler {
created_at: Utc::now(), created_at: Utc::now(),
}).await }).await
} }
DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => {
let reply_to_id = match in_reply_to_id {
Some(id) => id,
None => return Ok(()), // not a reply
};
let original = match self.thoughts.find_by_id(reply_to_id).await? {
Some(t) => t,
None => return Ok(()), // original deleted
};
if original.user_id == *user_id { return Ok(()); } // no self-notifications
self.notifications.save(&Notification {
id: NotificationId::new(),
user_id: original.user_id,
notification_type: NotificationType::Reply,
from_user_id: Some(user_id.clone()),
thought_id: Some(thought_id.clone()),
read: false,
created_at: Utc::now(),
}).await
}
// All other events: no notification needed in Plan 3 // All other events: no notification needed in Plan 3
_ => Ok(()), _ => Ok(()),
} }
@@ -175,4 +195,81 @@ mod tests {
assert_eq!(notifs[0].user_id, alice.id); assert_eq!(notifs[0].user_id, alice.id);
assert!(matches!(notifs[0].notification_type, NotificationType::Follow)); assert!(matches!(notifs[0].notification_type, NotificationType::Follow));
} }
#[tokio::test]
async fn reply_creates_notification_for_original_author() {
let store = TestStore::default();
let alice = alice();
let bob_id = UserId::new();
let original = Thought::new_local(
ThoughtId::new(), alice.id.clone(),
Content::new_local("original thought").unwrap(),
None, Visibility::Public, None, false,
);
store.users.lock().unwrap().push(alice.clone());
store.thoughts.lock().unwrap().push(original.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
handler.handle(&DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: bob_id.clone(),
in_reply_to_id: Some(original.id.clone()),
}).await.unwrap();
let notifs = store.notifications.lock().unwrap();
assert_eq!(notifs.len(), 1);
assert_eq!(notifs[0].user_id, alice.id);
assert!(matches!(notifs[0].notification_type, NotificationType::Reply));
}
#[tokio::test]
async fn self_reply_does_not_create_notification() {
let store = TestStore::default();
let alice = alice();
let original = Thought::new_local(
ThoughtId::new(), alice.id.clone(),
Content::new_local("original").unwrap(),
None, Visibility::Public, None, false,
);
store.users.lock().unwrap().push(alice.clone());
store.thoughts.lock().unwrap().push(original.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
handler.handle(&DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: alice.id.clone(),
in_reply_to_id: Some(original.id.clone()),
}).await.unwrap();
assert!(store.notifications.lock().unwrap().is_empty());
}
#[tokio::test]
async fn thought_without_reply_to_creates_no_notification() {
let store = TestStore::default();
let alice = alice();
store.users.lock().unwrap().push(alice.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
handler.handle(&DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: alice.id.clone(),
in_reply_to_id: None,
}).await.unwrap();
assert!(store.notifications.lock().unwrap().is_empty());
}
} }

View File

@@ -0,0 +1,360 @@
# Audit Gap Fixes Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Close the three gaps found in the architectural audit: `unblock_user` publishing a `UserUnblocked` event, `register` publishing a `UserRegistered` event, and the worker creating Reply notifications when a thought is a reply.
**Architecture:** Two new `DomainEvent` variants (`UserUnblocked`, `UserRegistered`) ripple through the event pipeline: added to `events.rs`, serialised in `event-payload`, published in the affected use cases. The worker `NotificationHandler` gains a new arm for `ThoughtCreated` with an `in_reply_to_id`.
**Tech Stack:** Rust, existing domain/event-payload/application/worker crates
---
## File Map
```
Modify: crates/domain/src/events.rs ← add UserUnblocked + UserRegistered variants
Modify: crates/adapters/event-payload/src/lib.rs ← add variants + From<&DomainEvent> + TryFrom arms
Modify: crates/application/src/use_cases/social.rs ← unblock_user accepts events, publishes UserUnblocked
Modify: crates/application/src/use_cases/auth.rs ← register publishes UserRegistered
Modify: crates/presentation/src/handlers/social.rs ← delete_block passes &*s.events
Modify: crates/worker/src/handlers.rs ← ThoughtCreated arm → Reply notification
```
---
### Task 1: New DomainEvent variants + event-payload + use case fixes
**Files:**
- Modify: `crates/domain/src/events.rs`
- Modify: `crates/adapters/event-payload/src/lib.rs`
- Modify: `crates/application/src/use_cases/social.rs`
- Modify: `crates/application/src/use_cases/auth.rs`
- Modify: `crates/presentation/src/handlers/social.rs`
- [ ] **Write failing tests** — add to `crates/application/src/use_cases/social.rs` test module (bottom of existing `#[cfg(test)] mod tests`):
```rust
#[tokio::test]
async fn unblock_user_publishes_event() {
let store = TestStore::default();
let alice = user("alice");
let bob = user("bob");
// block first so we can unblock
block_user(&store, &store, &alice.id, &bob.id).await.unwrap();
store.events.lock().unwrap().clear(); // reset after block event
unblock_user(&store, &store, &alice.id, &bob.id).await.unwrap();
let events = store.events.lock().unwrap();
assert_eq!(events.len(), 1);
assert!(matches!(events[0], DomainEvent::UserUnblocked { .. }));
}
```
Add to `crates/application/src/use_cases/auth.rs` test module:
```rust
#[tokio::test]
async fn register_publishes_user_registered_event() {
let store = TestStore::default();
register(&store, &FakeHasher, &FakeAuth, &store, input()).await.unwrap();
let events = store.events.lock().unwrap();
assert_eq!(events.len(), 1);
assert!(matches!(events[0], DomainEvent::UserRegistered { .. }));
}
```
Note: in the auth test, `&store` is passed as the `events` argument (TestStore implements EventPublisher). The existing tests use `&NoOpEventPublisher` — leave those unchanged, they still pass. Only the new test passes `&store` to capture events.
- [ ] **Run:** `cargo test -p application` — Expected: FAIL (UserUnblocked + UserRegistered not defined).
- [ ] **Add variants to `crates/domain/src/events.rs`** — append two variants to the `DomainEvent` enum, after `UserBlocked`:
```rust
UserUnblocked { blocker_id: UserId, blocked_id: UserId },
UserRegistered { user_id: UserId },
```
- [ ] **Add variants to `crates/adapters/event-payload/src/lib.rs`**:
**In the `EventPayload` enum** — append after `UserBlocked`:
```rust
UserUnblocked {
blocker_id: String,
blocked_id: String,
},
UserRegistered {
user_id: String,
},
```
**In `subject()`** — append after the `Self::UserBlocked` arm:
```rust
Self::UserUnblocked { .. } => "users.unblocked",
Self::UserRegistered { .. } => "users.registered",
```
**In `impl From<&DomainEvent> for EventPayload`** — append after the `DomainEvent::UserBlocked` arm:
```rust
DomainEvent::UserUnblocked { blocker_id, blocked_id } => Self::UserUnblocked {
blocker_id: blocker_id.to_string(),
blocked_id: blocked_id.to_string(),
},
DomainEvent::UserRegistered { user_id } => Self::UserRegistered {
user_id: user_id.to_string(),
},
```
**In `impl TryFrom<EventPayload> for DomainEvent`** — append after the `EventPayload::UserBlocked` arm:
```rust
EventPayload::UserUnblocked { blocker_id, blocked_id } => DomainEvent::UserUnblocked {
blocker_id: UserId::from_uuid(parse_uuid(&blocker_id, "blocker_id")?),
blocked_id: UserId::from_uuid(parse_uuid(&blocked_id, "blocked_id")?),
},
EventPayload::UserRegistered { user_id } => DomainEvent::UserRegistered {
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
},
```
- [ ] **Update `unblock_user` in `crates/application/src/use_cases/social.rs`**:
Replace the current function (which takes only `blocks` and two UserId params):
```rust
pub async fn unblock_user(blocks: &dyn BlockRepository, blocker_id: &UserId, blocked_id: &UserId) -> Result<(), DomainError> {
blocks.delete(blocker_id, blocked_id).await?;
Ok(())
}
```
With:
```rust
pub async fn unblock_user(
blocks: &dyn BlockRepository,
events: &dyn EventPublisher,
blocker_id: &UserId,
blocked_id: &UserId,
) -> Result<(), DomainError> {
blocks.delete(blocker_id, blocked_id).await?;
events.publish(&DomainEvent::UserUnblocked {
blocker_id: blocker_id.clone(),
blocked_id: blocked_id.clone(),
}).await?;
Ok(())
}
```
- [ ] **Update `register` in `crates/application/src/use_cases/auth.rs`**:
Change the parameter from `_events` to `events` (remove the underscore) and add one line after `users.save(&user).await?;`:
```rust
pub async fn register(
users: &dyn UserRepository,
hasher: &dyn PasswordHasher,
auth: &dyn AuthService,
events: &dyn EventPublisher, // ← remove leading underscore
input: RegisterInput,
) -> Result<RegisterOutput, DomainError> {
let username = Username::new(input.username)?;
let email = Email::new(input.email)?;
if users.find_by_username(&username).await?.is_some() {
return Err(DomainError::Conflict("username taken".into()));
}
if users.find_by_email(&email).await?.is_some() {
return Err(DomainError::Conflict("email taken".into()));
}
let hash = hasher.hash(&input.password).await?;
let user = User::new_local(UserId::new(), username, email, hash);
users.save(&user).await?;
events.publish(&DomainEvent::UserRegistered { user_id: user.id.clone() }).await?; // ← new
let token = auth.generate_token(&user.id)?;
Ok(RegisterOutput { user, token: token.token })
}
```
- [ ] **Update `delete_block` handler in `crates/presentation/src/handlers/social.rs`**:
The handler currently calls `unblock_user(&*s.blocks, &uid, &UserId::from_uuid(target))`. Add `&*s.events` as the second argument:
```rust
pub async fn delete_block(State(s): State<AppState>, AuthUser(uid): AuthUser, Path(target): Path<Uuid>) -> Result<StatusCode, ApiError> {
unblock_user(&*s.blocks, &*s.events, &uid, &UserId::from_uuid(target)).await?;
Ok(StatusCode::NO_CONTENT)
}
```
- [ ] **Run:** `cargo test -p application` — Expected: all tests pass including 2 new ones.
- [ ] **Run:** `cargo check --workspace` — Expected: no errors (the handler change + event-payload additions must compile).
- [ ] **Commit:**
```bash
git add crates/domain/src/events.rs \
crates/adapters/event-payload/src/lib.rs \
crates/application/src/use_cases/social.rs \
crates/application/src/use_cases/auth.rs \
crates/presentation/src/handlers/social.rs
git commit -m "feat: UserUnblocked + UserRegistered events, fix unblock_user and register signatures"
```
---
### Task 2: Reply notifications in worker
**Files:**
- Modify: `crates/worker/src/handlers.rs`
- [ ] **Write the failing test** — add to the existing `#[cfg(test)] mod tests` block in `crates/worker/src/handlers.rs`, after `follow_accepted_creates_notification`:
```rust
#[tokio::test]
async fn reply_creates_notification_for_original_author() {
let store = TestStore::default();
let alice = alice(); // author of the original thought
let bob_id = UserId::new(); // author of the reply
let original = Thought::new_local(
ThoughtId::new(), alice.id.clone(),
Content::new_local("original thought").unwrap(),
None, Visibility::Public, None, false,
);
store.users.lock().unwrap().push(alice.clone());
store.thoughts.lock().unwrap().push(original.clone());
let reply_id = ThoughtId::new();
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
// ThoughtCreated with in_reply_to_id pointing at alice's thought
handler.handle(&DomainEvent::ThoughtCreated {
thought_id: reply_id,
user_id: bob_id.clone(),
in_reply_to_id: Some(original.id.clone()),
}).await.unwrap();
let notifs = store.notifications.lock().unwrap();
assert_eq!(notifs.len(), 1);
assert_eq!(notifs[0].user_id, alice.id);
assert!(matches!(notifs[0].notification_type, NotificationType::Reply));
}
#[tokio::test]
async fn self_reply_does_not_create_notification() {
let store = TestStore::default();
let alice = alice();
let original = Thought::new_local(
ThoughtId::new(), alice.id.clone(),
Content::new_local("original").unwrap(),
None, Visibility::Public, None, false,
);
store.users.lock().unwrap().push(alice.clone());
store.thoughts.lock().unwrap().push(original.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
handler.handle(&DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: alice.id.clone(), // alice replying to herself
in_reply_to_id: Some(original.id.clone()),
}).await.unwrap();
assert!(store.notifications.lock().unwrap().is_empty());
}
#[tokio::test]
async fn thought_without_reply_to_creates_no_notification() {
let store = TestStore::default();
let alice = alice();
store.users.lock().unwrap().push(alice.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
handler.handle(&DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: alice.id.clone(),
in_reply_to_id: None, // not a reply
}).await.unwrap();
assert!(store.notifications.lock().unwrap().is_empty());
}
```
- [ ] **Run:** `DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test -p worker` — Expected: FAIL on 3 new tests (reply handling not implemented).
- [ ] **Add the `ThoughtCreated` arm** to `NotificationHandler::handle` in `crates/worker/src/handlers.rs` — insert before the final `_ => Ok(()),` arm:
```rust
DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => {
let reply_to_id = match in_reply_to_id {
Some(id) => id,
None => return Ok(()), // not a reply — no notification needed
};
let original = match self.thoughts.find_by_id(reply_to_id).await? {
Some(t) => t,
None => return Ok(()), // original thought deleted — skip
};
if original.user_id == *user_id { return Ok(()); } // no self-notifications
self.notifications.save(&Notification {
id: NotificationId::new(),
user_id: original.user_id,
notification_type: NotificationType::Reply,
from_user_id: Some(user_id.clone()),
thought_id: Some(thought_id.clone()),
read: false,
created_at: Utc::now(),
}).await
}
```
- [ ] **Run:** `cargo test -p worker` — Expected: all 6 tests pass (3 existing + 3 new).
- [ ] **Run full suite:** `DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3` — Expected: all tests pass.
- [ ] **Commit:**
```bash
git add crates/worker/src/handlers.rs
git commit -m "feat(worker): Reply notification when ThoughtCreated has in_reply_to_id"
```
---
## Self-Review
**Spec coverage:**
-`UserUnblocked` added to DomainEvent (Task 1)
-`UserRegistered` added to DomainEvent (Task 1)
- ✅ Both variants added to EventPayload with subject routing (Task 1)
- ✅ Both variants covered in From<&DomainEvent> and TryFrom<EventPayload> (Task 1)
-`unblock_user` now accepts `events` and publishes `UserUnblocked` (Task 1)
-`register` now publishes `UserRegistered` (Task 1)
-`delete_block` handler passes `&*s.events` (Task 1)
-`ThoughtCreated` with `in_reply_to_id` triggers Reply notification (Task 2)
- ✅ Self-reply suppressed (Task 2)
- ✅ Plain thought (no reply) triggers no notification (Task 2)
**Placeholder scan:** None.
**Type consistency:**
- `DomainEvent::UserUnblocked { blocker_id: UserId, blocked_id: UserId }` — matches use case publish call and EventPayload From arm
- `DomainEvent::UserRegistered { user_id: UserId }` — matches use case publish call and EventPayload From arm
- `NotificationType::Reply` — already exists in `domain/src/models/notification.rs`
- `unblock_user(blocks, events, blocker_id, blocked_id)` — matches updated handler call in `delete_block`
**Notes:**
- `NotificationType::Reply` was already defined in domain models (Plan 1) — no domain model change needed
- The `event-payload` `all_subjects_are_unique` test will catch duplicate NATS subjects — the new subjects "users.unblocked" and "users.registered" are unique