Files
thoughts/docs/superpowers/plans/2026-05-14-audit-gap-fixes.md

14 KiB

Audit Gap Fixes Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Close the three gaps found in the architectural audit: unblock_user publishing a UserUnblocked event, register publishing a UserRegistered event, and the worker creating Reply notifications when a thought is a reply.

Architecture: Two new DomainEvent variants (UserUnblocked, UserRegistered) ripple through the event pipeline: added to events.rs, serialised in event-payload, published in the affected use cases. The worker NotificationHandler gains a new arm for ThoughtCreated with an in_reply_to_id.

Tech Stack: Rust, existing domain/event-payload/application/worker crates


File Map

Modify: crates/domain/src/events.rs                           ← add UserUnblocked + UserRegistered variants
Modify: crates/adapters/event-payload/src/lib.rs              ← add variants + From<&DomainEvent> + TryFrom arms
Modify: crates/application/src/use_cases/social.rs            ← unblock_user accepts events, publishes UserUnblocked
Modify: crates/application/src/use_cases/auth.rs              ← register publishes UserRegistered
Modify: crates/presentation/src/handlers/social.rs            ← delete_block passes &*s.events
Modify: crates/worker/src/handlers.rs                         ← ThoughtCreated arm → Reply notification

Task 1: New DomainEvent variants + event-payload + use case fixes

Files:

  • Modify: crates/domain/src/events.rs

  • Modify: crates/adapters/event-payload/src/lib.rs

  • Modify: crates/application/src/use_cases/social.rs

  • Modify: crates/application/src/use_cases/auth.rs

  • Modify: crates/presentation/src/handlers/social.rs

  • Write failing tests — add to crates/application/src/use_cases/social.rs test module (bottom of existing #[cfg(test)] mod tests):

    #[tokio::test]
    async fn unblock_user_publishes_event() {
        let store = TestStore::default();
        let alice = user("alice");
        let bob   = user("bob");
        // block first so we can unblock
        block_user(&store, &store, &alice.id, &bob.id).await.unwrap();
        store.events.lock().unwrap().clear(); // reset after block event
        unblock_user(&store, &store, &alice.id, &bob.id).await.unwrap();
        let events = store.events.lock().unwrap();
        assert_eq!(events.len(), 1);
        assert!(matches!(events[0], DomainEvent::UserUnblocked { .. }));
    }

Add to crates/application/src/use_cases/auth.rs test module:

    #[tokio::test]
    async fn register_publishes_user_registered_event() {
        let store = TestStore::default();
        register(&store, &FakeHasher, &FakeAuth, &store, input()).await.unwrap();
        let events = store.events.lock().unwrap();
        assert_eq!(events.len(), 1);
        assert!(matches!(events[0], DomainEvent::UserRegistered { .. }));
    }

Note: in the auth test, &store is passed as the events argument (TestStore implements EventPublisher). The existing tests use &NoOpEventPublisher — leave those unchanged, they still pass. Only the new test passes &store to capture events.

  • Run: cargo test -p application — Expected: FAIL (UserUnblocked + UserRegistered not defined).

  • Add variants to crates/domain/src/events.rs — append two variants to the DomainEvent enum, after UserBlocked:

    UserUnblocked { blocker_id: UserId, blocked_id: UserId },
    UserRegistered { user_id: UserId },
  • Add variants to crates/adapters/event-payload/src/lib.rs:

In the EventPayload enum — append after UserBlocked:

    UserUnblocked {
        blocker_id: String,
        blocked_id: String,
    },
    UserRegistered {
        user_id: String,
    },

In subject() — append after the Self::UserBlocked arm:

            Self::UserUnblocked { .. } => "users.unblocked",
            Self::UserRegistered { .. } => "users.registered",

In impl From<&DomainEvent> for EventPayload — append after the DomainEvent::UserBlocked arm:

            DomainEvent::UserUnblocked { blocker_id, blocked_id } => Self::UserUnblocked {
                blocker_id: blocker_id.to_string(),
                blocked_id: blocked_id.to_string(),
            },
            DomainEvent::UserRegistered { user_id } => Self::UserRegistered {
                user_id: user_id.to_string(),
            },

In impl TryFrom<EventPayload> for DomainEvent — append after the EventPayload::UserBlocked arm:

            EventPayload::UserUnblocked { blocker_id, blocked_id } => DomainEvent::UserUnblocked {
                blocker_id: UserId::from_uuid(parse_uuid(&blocker_id, "blocker_id")?),
                blocked_id: UserId::from_uuid(parse_uuid(&blocked_id, "blocked_id")?),
            },
            EventPayload::UserRegistered { user_id } => DomainEvent::UserRegistered {
                user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
            },
  • Update unblock_user in crates/application/src/use_cases/social.rs:

Replace the current function (which takes only blocks and two UserId params):

pub async fn unblock_user(blocks: &dyn BlockRepository, blocker_id: &UserId, blocked_id: &UserId) -> Result<(), DomainError> {
    blocks.delete(blocker_id, blocked_id).await?;
    Ok(())
}

With:

pub async fn unblock_user(
    blocks: &dyn BlockRepository,
    events: &dyn EventPublisher,
    blocker_id: &UserId,
    blocked_id: &UserId,
) -> Result<(), DomainError> {
    blocks.delete(blocker_id, blocked_id).await?;
    events.publish(&DomainEvent::UserUnblocked {
        blocker_id: blocker_id.clone(),
        blocked_id: blocked_id.clone(),
    }).await?;
    Ok(())
}
  • Update register in crates/application/src/use_cases/auth.rs:

Change the parameter from _events to events (remove the underscore) and add one line after users.save(&user).await?;:

pub async fn register(
    users: &dyn UserRepository,
    hasher: &dyn PasswordHasher,
    auth: &dyn AuthService,
    events: &dyn EventPublisher,    // ← remove leading underscore
    input: RegisterInput,
) -> Result<RegisterOutput, DomainError> {
    let username = Username::new(input.username)?;
    let email = Email::new(input.email)?;
    if users.find_by_username(&username).await?.is_some() {
        return Err(DomainError::Conflict("username taken".into()));
    }
    if users.find_by_email(&email).await?.is_some() {
        return Err(DomainError::Conflict("email taken".into()));
    }
    let hash = hasher.hash(&input.password).await?;
    let user = User::new_local(UserId::new(), username, email, hash);
    users.save(&user).await?;
    events.publish(&DomainEvent::UserRegistered { user_id: user.id.clone() }).await?;  // ← new
    let token = auth.generate_token(&user.id)?;
    Ok(RegisterOutput { user, token: token.token })
}
  • Update delete_block handler in crates/presentation/src/handlers/social.rs:

The handler currently calls unblock_user(&*s.blocks, &uid, &UserId::from_uuid(target)). Add &*s.events as the second argument:

pub async fn delete_block(State(s): State<AppState>, AuthUser(uid): AuthUser, Path(target): Path<Uuid>) -> Result<StatusCode, ApiError> {
    unblock_user(&*s.blocks, &*s.events, &uid, &UserId::from_uuid(target)).await?;
    Ok(StatusCode::NO_CONTENT)
}
  • Run: cargo test -p application — Expected: all tests pass including 2 new ones.

  • Run: cargo check --workspace — Expected: no errors (the handler change + event-payload additions must compile).

  • Commit:

git add crates/domain/src/events.rs \
        crates/adapters/event-payload/src/lib.rs \
        crates/application/src/use_cases/social.rs \
        crates/application/src/use_cases/auth.rs \
        crates/presentation/src/handlers/social.rs
git commit -m "feat: UserUnblocked + UserRegistered events, fix unblock_user and register signatures"

Task 2: Reply notifications in worker

Files:

  • Modify: crates/worker/src/handlers.rs

  • Write the failing test — add to the existing #[cfg(test)] mod tests block in crates/worker/src/handlers.rs, after follow_accepted_creates_notification:

    #[tokio::test]
    async fn reply_creates_notification_for_original_author() {
        let store = TestStore::default();
        let alice = alice(); // author of the original thought
        let bob_id = UserId::new(); // author of the reply

        let original = Thought::new_local(
            ThoughtId::new(), alice.id.clone(),
            Content::new_local("original thought").unwrap(),
            None, Visibility::Public, None, false,
        );
        store.users.lock().unwrap().push(alice.clone());
        store.thoughts.lock().unwrap().push(original.clone());

        let reply_id = ThoughtId::new();
        let handler = NotificationHandler {
            thoughts:      Arc::new(store.clone()),
            notifications: Arc::new(store.clone()),
        };

        // ThoughtCreated with in_reply_to_id pointing at alice's thought
        handler.handle(&DomainEvent::ThoughtCreated {
            thought_id: reply_id,
            user_id: bob_id.clone(),
            in_reply_to_id: Some(original.id.clone()),
        }).await.unwrap();

        let notifs = store.notifications.lock().unwrap();
        assert_eq!(notifs.len(), 1);
        assert_eq!(notifs[0].user_id, alice.id);
        assert!(matches!(notifs[0].notification_type, NotificationType::Reply));
    }

    #[tokio::test]
    async fn self_reply_does_not_create_notification() {
        let store = TestStore::default();
        let alice = alice();
        let original = Thought::new_local(
            ThoughtId::new(), alice.id.clone(),
            Content::new_local("original").unwrap(),
            None, Visibility::Public, None, false,
        );
        store.users.lock().unwrap().push(alice.clone());
        store.thoughts.lock().unwrap().push(original.clone());

        let handler = NotificationHandler {
            thoughts:      Arc::new(store.clone()),
            notifications: Arc::new(store.clone()),
        };

        handler.handle(&DomainEvent::ThoughtCreated {
            thought_id: ThoughtId::new(),
            user_id: alice.id.clone(), // alice replying to herself
            in_reply_to_id: Some(original.id.clone()),
        }).await.unwrap();

        assert!(store.notifications.lock().unwrap().is_empty());
    }

    #[tokio::test]
    async fn thought_without_reply_to_creates_no_notification() {
        let store = TestStore::default();
        let alice = alice();
        store.users.lock().unwrap().push(alice.clone());

        let handler = NotificationHandler {
            thoughts:      Arc::new(store.clone()),
            notifications: Arc::new(store.clone()),
        };

        handler.handle(&DomainEvent::ThoughtCreated {
            thought_id: ThoughtId::new(),
            user_id: alice.id.clone(),
            in_reply_to_id: None, // not a reply
        }).await.unwrap();

        assert!(store.notifications.lock().unwrap().is_empty());
    }
  • Run: DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test -p worker — Expected: FAIL on 3 new tests (reply handling not implemented).

  • Add the ThoughtCreated arm to NotificationHandler::handle in crates/worker/src/handlers.rs — insert before the final _ => Ok(()), arm:

            DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => {
                let reply_to_id = match in_reply_to_id {
                    Some(id) => id,
                    None => return Ok(()), // not a reply — no notification needed
                };
                let original = match self.thoughts.find_by_id(reply_to_id).await? {
                    Some(t) => t,
                    None => return Ok(()), // original thought deleted — skip
                };
                if original.user_id == *user_id { return Ok(()); } // no self-notifications
                self.notifications.save(&Notification {
                    id: NotificationId::new(),
                    user_id: original.user_id,
                    notification_type: NotificationType::Reply,
                    from_user_id: Some(user_id.clone()),
                    thought_id: Some(thought_id.clone()),
                    read: false,
                    created_at: Utc::now(),
                }).await
            }
  • Run: cargo test -p worker — Expected: all 6 tests pass (3 existing + 3 new).

  • Run full suite: DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3 — Expected: all tests pass.

  • Commit:

git add crates/worker/src/handlers.rs
git commit -m "feat(worker): Reply notification when ThoughtCreated has in_reply_to_id"

Self-Review

Spec coverage:

  • UserUnblocked added to DomainEvent (Task 1)
  • UserRegistered added to DomainEvent (Task 1)
  • Both variants added to EventPayload with subject routing (Task 1)
  • Both variants covered in From<&DomainEvent> and TryFrom (Task 1)
  • unblock_user now accepts events and publishes UserUnblocked (Task 1)
  • register now publishes UserRegistered (Task 1)
  • delete_block handler passes &*s.events (Task 1)
  • ThoughtCreated with in_reply_to_id triggers Reply notification (Task 2)
  • Self-reply suppressed (Task 2)
  • Plain thought (no reply) triggers no notification (Task 2)

Placeholder scan: None.

Type consistency:

  • DomainEvent::UserUnblocked { blocker_id: UserId, blocked_id: UserId } — matches use case publish call and EventPayload From arm
  • DomainEvent::UserRegistered { user_id: UserId } — matches use case publish call and EventPayload From arm
  • NotificationType::Reply — already exists in domain/src/models/notification.rs
  • unblock_user(blocks, events, blocker_id, blocked_id) — matches updated handler call in delete_block

Notes:

  • NotificationType::Reply was already defined in domain models (Plan 1) — no domain model change needed
  • The event-payload all_subjects_are_unique test will catch duplicate NATS subjects — the new subjects "users.unblocked" and "users.registered" are unique