14 KiB
Audit Gap Fixes Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Close the three gaps found in the architectural audit: unblock_user publishing a UserUnblocked event, register publishing a UserRegistered event, and the worker creating Reply notifications when a thought is a reply.
Architecture: Two new DomainEvent variants (UserUnblocked, UserRegistered) ripple through the event pipeline: added to events.rs, serialised in event-payload, published in the affected use cases. The worker NotificationHandler gains a new arm for ThoughtCreated with an in_reply_to_id.
Tech Stack: Rust, existing domain/event-payload/application/worker crates
File Map
Modify: crates/domain/src/events.rs ← add UserUnblocked + UserRegistered variants
Modify: crates/adapters/event-payload/src/lib.rs ← add variants + From<&DomainEvent> + TryFrom arms
Modify: crates/application/src/use_cases/social.rs ← unblock_user accepts events, publishes UserUnblocked
Modify: crates/application/src/use_cases/auth.rs ← register publishes UserRegistered
Modify: crates/presentation/src/handlers/social.rs ← delete_block passes &*s.events
Modify: crates/worker/src/handlers.rs ← ThoughtCreated arm → Reply notification
Task 1: New DomainEvent variants + event-payload + use case fixes
Files:
-
Modify:
crates/domain/src/events.rs -
Modify:
crates/adapters/event-payload/src/lib.rs -
Modify:
crates/application/src/use_cases/social.rs -
Modify:
crates/application/src/use_cases/auth.rs -
Modify:
crates/presentation/src/handlers/social.rs -
Write failing tests — add to
crates/application/src/use_cases/social.rstest module (bottom of existing#[cfg(test)] mod tests):
#[tokio::test]
async fn unblock_user_publishes_event() {
let store = TestStore::default();
let alice = user("alice");
let bob = user("bob");
// block first so we can unblock
block_user(&store, &store, &alice.id, &bob.id).await.unwrap();
store.events.lock().unwrap().clear(); // reset after block event
unblock_user(&store, &store, &alice.id, &bob.id).await.unwrap();
let events = store.events.lock().unwrap();
assert_eq!(events.len(), 1);
assert!(matches!(events[0], DomainEvent::UserUnblocked { .. }));
}
Add to crates/application/src/use_cases/auth.rs test module:
#[tokio::test]
async fn register_publishes_user_registered_event() {
let store = TestStore::default();
register(&store, &FakeHasher, &FakeAuth, &store, input()).await.unwrap();
let events = store.events.lock().unwrap();
assert_eq!(events.len(), 1);
assert!(matches!(events[0], DomainEvent::UserRegistered { .. }));
}
Note: in the auth test, &store is passed as the events argument (TestStore implements EventPublisher). The existing tests use &NoOpEventPublisher — leave those unchanged, they still pass. Only the new test passes &store to capture events.
-
Run:
cargo test -p application— Expected: FAIL (UserUnblocked + UserRegistered not defined). -
Add variants to
crates/domain/src/events.rs— append two variants to theDomainEventenum, afterUserBlocked:
UserUnblocked { blocker_id: UserId, blocked_id: UserId },
UserRegistered { user_id: UserId },
- Add variants to
crates/adapters/event-payload/src/lib.rs:
In the EventPayload enum — append after UserBlocked:
UserUnblocked {
blocker_id: String,
blocked_id: String,
},
UserRegistered {
user_id: String,
},
In subject() — append after the Self::UserBlocked arm:
Self::UserUnblocked { .. } => "users.unblocked",
Self::UserRegistered { .. } => "users.registered",
In impl From<&DomainEvent> for EventPayload — append after the DomainEvent::UserBlocked arm:
DomainEvent::UserUnblocked { blocker_id, blocked_id } => Self::UserUnblocked {
blocker_id: blocker_id.to_string(),
blocked_id: blocked_id.to_string(),
},
DomainEvent::UserRegistered { user_id } => Self::UserRegistered {
user_id: user_id.to_string(),
},
In impl TryFrom<EventPayload> for DomainEvent — append after the EventPayload::UserBlocked arm:
EventPayload::UserUnblocked { blocker_id, blocked_id } => DomainEvent::UserUnblocked {
blocker_id: UserId::from_uuid(parse_uuid(&blocker_id, "blocker_id")?),
blocked_id: UserId::from_uuid(parse_uuid(&blocked_id, "blocked_id")?),
},
EventPayload::UserRegistered { user_id } => DomainEvent::UserRegistered {
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
},
- Update
unblock_userincrates/application/src/use_cases/social.rs:
Replace the current function (which takes only blocks and two UserId params):
pub async fn unblock_user(blocks: &dyn BlockRepository, blocker_id: &UserId, blocked_id: &UserId) -> Result<(), DomainError> {
blocks.delete(blocker_id, blocked_id).await?;
Ok(())
}
With:
pub async fn unblock_user(
blocks: &dyn BlockRepository,
events: &dyn EventPublisher,
blocker_id: &UserId,
blocked_id: &UserId,
) -> Result<(), DomainError> {
blocks.delete(blocker_id, blocked_id).await?;
events.publish(&DomainEvent::UserUnblocked {
blocker_id: blocker_id.clone(),
blocked_id: blocked_id.clone(),
}).await?;
Ok(())
}
- Update
registerincrates/application/src/use_cases/auth.rs:
Change the parameter from _events to events (remove the underscore) and add one line after users.save(&user).await?;:
pub async fn register(
users: &dyn UserRepository,
hasher: &dyn PasswordHasher,
auth: &dyn AuthService,
events: &dyn EventPublisher, // ← remove leading underscore
input: RegisterInput,
) -> Result<RegisterOutput, DomainError> {
let username = Username::new(input.username)?;
let email = Email::new(input.email)?;
if users.find_by_username(&username).await?.is_some() {
return Err(DomainError::Conflict("username taken".into()));
}
if users.find_by_email(&email).await?.is_some() {
return Err(DomainError::Conflict("email taken".into()));
}
let hash = hasher.hash(&input.password).await?;
let user = User::new_local(UserId::new(), username, email, hash);
users.save(&user).await?;
events.publish(&DomainEvent::UserRegistered { user_id: user.id.clone() }).await?; // ← new
let token = auth.generate_token(&user.id)?;
Ok(RegisterOutput { user, token: token.token })
}
- Update
delete_blockhandler incrates/presentation/src/handlers/social.rs:
The handler currently calls unblock_user(&*s.blocks, &uid, &UserId::from_uuid(target)). Add &*s.events as the second argument:
pub async fn delete_block(State(s): State<AppState>, AuthUser(uid): AuthUser, Path(target): Path<Uuid>) -> Result<StatusCode, ApiError> {
unblock_user(&*s.blocks, &*s.events, &uid, &UserId::from_uuid(target)).await?;
Ok(StatusCode::NO_CONTENT)
}
-
Run:
cargo test -p application— Expected: all tests pass including 2 new ones. -
Run:
cargo check --workspace— Expected: no errors (the handler change + event-payload additions must compile). -
Commit:
git add crates/domain/src/events.rs \
crates/adapters/event-payload/src/lib.rs \
crates/application/src/use_cases/social.rs \
crates/application/src/use_cases/auth.rs \
crates/presentation/src/handlers/social.rs
git commit -m "feat: UserUnblocked + UserRegistered events, fix unblock_user and register signatures"
Task 2: Reply notifications in worker
Files:
-
Modify:
crates/worker/src/handlers.rs -
Write the failing test — add to the existing
#[cfg(test)] mod testsblock incrates/worker/src/handlers.rs, afterfollow_accepted_creates_notification:
#[tokio::test]
async fn reply_creates_notification_for_original_author() {
let store = TestStore::default();
let alice = alice(); // author of the original thought
let bob_id = UserId::new(); // author of the reply
let original = Thought::new_local(
ThoughtId::new(), alice.id.clone(),
Content::new_local("original thought").unwrap(),
None, Visibility::Public, None, false,
);
store.users.lock().unwrap().push(alice.clone());
store.thoughts.lock().unwrap().push(original.clone());
let reply_id = ThoughtId::new();
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
// ThoughtCreated with in_reply_to_id pointing at alice's thought
handler.handle(&DomainEvent::ThoughtCreated {
thought_id: reply_id,
user_id: bob_id.clone(),
in_reply_to_id: Some(original.id.clone()),
}).await.unwrap();
let notifs = store.notifications.lock().unwrap();
assert_eq!(notifs.len(), 1);
assert_eq!(notifs[0].user_id, alice.id);
assert!(matches!(notifs[0].notification_type, NotificationType::Reply));
}
#[tokio::test]
async fn self_reply_does_not_create_notification() {
let store = TestStore::default();
let alice = alice();
let original = Thought::new_local(
ThoughtId::new(), alice.id.clone(),
Content::new_local("original").unwrap(),
None, Visibility::Public, None, false,
);
store.users.lock().unwrap().push(alice.clone());
store.thoughts.lock().unwrap().push(original.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
handler.handle(&DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: alice.id.clone(), // alice replying to herself
in_reply_to_id: Some(original.id.clone()),
}).await.unwrap();
assert!(store.notifications.lock().unwrap().is_empty());
}
#[tokio::test]
async fn thought_without_reply_to_creates_no_notification() {
let store = TestStore::default();
let alice = alice();
store.users.lock().unwrap().push(alice.clone());
let handler = NotificationHandler {
thoughts: Arc::new(store.clone()),
notifications: Arc::new(store.clone()),
};
handler.handle(&DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: alice.id.clone(),
in_reply_to_id: None, // not a reply
}).await.unwrap();
assert!(store.notifications.lock().unwrap().is_empty());
}
-
Run:
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test -p worker— Expected: FAIL on 3 new tests (reply handling not implemented). -
Add the
ThoughtCreatedarm toNotificationHandler::handleincrates/worker/src/handlers.rs— insert before the final_ => Ok(()),arm:
DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => {
let reply_to_id = match in_reply_to_id {
Some(id) => id,
None => return Ok(()), // not a reply — no notification needed
};
let original = match self.thoughts.find_by_id(reply_to_id).await? {
Some(t) => t,
None => return Ok(()), // original thought deleted — skip
};
if original.user_id == *user_id { return Ok(()); } // no self-notifications
self.notifications.save(&Notification {
id: NotificationId::new(),
user_id: original.user_id,
notification_type: NotificationType::Reply,
from_user_id: Some(user_id.clone()),
thought_id: Some(thought_id.clone()),
read: false,
created_at: Utc::now(),
}).await
}
-
Run:
cargo test -p worker— Expected: all 6 tests pass (3 existing + 3 new). -
Run full suite:
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3— Expected: all tests pass. -
Commit:
git add crates/worker/src/handlers.rs
git commit -m "feat(worker): Reply notification when ThoughtCreated has in_reply_to_id"
Self-Review
Spec coverage:
- ✅
UserUnblockedadded to DomainEvent (Task 1) - ✅
UserRegisteredadded to DomainEvent (Task 1) - ✅ Both variants added to EventPayload with subject routing (Task 1)
- ✅ Both variants covered in From<&DomainEvent> and TryFrom (Task 1)
- ✅
unblock_usernow acceptseventsand publishesUserUnblocked(Task 1) - ✅
registernow publishesUserRegistered(Task 1) - ✅
delete_blockhandler passes&*s.events(Task 1) - ✅
ThoughtCreatedwithin_reply_to_idtriggers Reply notification (Task 2) - ✅ Self-reply suppressed (Task 2)
- ✅ Plain thought (no reply) triggers no notification (Task 2)
Placeholder scan: None.
Type consistency:
DomainEvent::UserUnblocked { blocker_id: UserId, blocked_id: UserId }— matches use case publish call and EventPayload From armDomainEvent::UserRegistered { user_id: UserId }— matches use case publish call and EventPayload From armNotificationType::Reply— already exists indomain/src/models/notification.rsunblock_user(blocks, events, blocker_id, blocked_id)— matches updated handler call indelete_block
Notes:
NotificationType::Replywas already defined in domain models (Plan 1) — no domain model change needed- The
event-payloadall_subjects_are_uniquetest will catch duplicate NATS subjects — the new subjects "users.unblocked" and "users.registered" are unique