Files
thoughts/docs/superpowers/plans/2026-05-14-v2-plan3-events.md

37 KiB

Thoughts v2 — Plan 3: Events + Worker (NATS)

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Wire real async event processing — use cases publish domain events to NATS, a worker binary subscribes and runs handlers (NotificationHandler creates DB records; FederationHandler is stubbed for Plan 4).

Architecture: event-payload/ holds the serializable NATS wire types. nats/ wraps async-nats and implements both EventPublisher (publish to NATS) and EventConsumer (subscribe, yield EventEnvelope stream). worker/ is a standalone binary that consumes events and dispatches to handlers. presentation/ swaps its NoOpEventPublisher for the real NATS publisher. event-publisher/ stays a stub (future fan-out to multiple backends).

Tech Stack: Rust, async-nats 0.38, serde_json, futures, async-stream, tokio

Prerequisites: NATS server running locally. Start with:

docker run -d --name nats -p 4222:4222 nats:latest
# or add to docker-compose if preferred

File Map

Modified: Cargo.toml                                     ← add async-nats, async-stream to workspace.dependencies
Modified: crates/adapters/event-payload/Cargo.toml       ← add deps
Modified: crates/adapters/event-payload/src/lib.rs       ← EventPayload enum + subject() + From<&DomainEvent>
Modified: crates/adapters/nats/Cargo.toml                ← add deps
Modified: crates/adapters/nats/src/lib.rs                ← NatsEventPublisher + NatsEventConsumer
Modified: crates/worker/Cargo.toml                       ← add deps, add [[bin]]
Create:   crates/worker/src/handlers.rs                  ← NotificationHandler, FederationHandler (stub)
Modified: crates/worker/src/main.rs                      ← consumer loop binary
Modified: crates/presentation/src/lib.rs                 ← swap NoOp for NatsEventPublisher
Modified: crates/presentation/Cargo.toml                 ← add nats dep

Task 1: Workspace deps + event-payload crate

Files:

  • Modify: Cargo.toml (root workspace)

  • Modify: crates/adapters/event-payload/Cargo.toml

  • Modify: crates/adapters/event-payload/src/lib.rs

  • Add to root Cargo.toml [workspace.dependencies]:

async-nats   = "0.38"
async-stream = "0.3"

event-payload   = { path = "crates/adapters/event-payload" }
event-publisher = { path = "crates/adapters/event-publisher" }
nats            = { path = "crates/adapters/nats" }

Check if event-payload, event-publisher, nats are already listed — they should be from Plan 1 scaffolding. If so, skip those lines and only add async-nats and async-stream.

  • Write crates/adapters/event-payload/Cargo.toml:
[package]
name = "event-payload"
version = "0.1.0"
edition = "2021"

[dependencies]
serde      = { workspace = true }
serde_json = { workspace = true }
  • Write crates/adapters/event-payload/src/lib.rs:
use serde::{Deserialize, Serialize};

/// Serializable mirror of domain::events::DomainEvent.
/// All IDs are Strings (UUID hex) — no domain type dependencies.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum EventPayload {
    ThoughtCreated {
        thought_id: String,
        user_id: String,
        in_reply_to_id: Option<String>,
    },
    ThoughtDeleted {
        thought_id: String,
        user_id: String,
    },
    ThoughtUpdated {
        thought_id: String,
        user_id: String,
    },
    LikeAdded {
        like_id: String,
        user_id: String,
        thought_id: String,
    },
    LikeRemoved {
        user_id: String,
        thought_id: String,
    },
    BoostAdded {
        boost_id: String,
        user_id: String,
        thought_id: String,
    },
    BoostRemoved {
        user_id: String,
        thought_id: String,
    },
    FollowRequested {
        follower_id: String,
        following_id: String,
    },
    FollowAccepted {
        follower_id: String,
        following_id: String,
    },
    FollowRejected {
        follower_id: String,
        following_id: String,
    },
    Unfollowed {
        follower_id: String,
        following_id: String,
    },
    UserBlocked {
        blocker_id: String,
        blocked_id: String,
    },
}

impl EventPayload {
    /// Returns the NATS subject for this event.
    pub fn subject(&self) -> &'static str {
        match self {
            Self::ThoughtCreated { .. }  => "thoughts.created",
            Self::ThoughtDeleted { .. }  => "thoughts.deleted",
            Self::ThoughtUpdated { .. }  => "thoughts.updated",
            Self::LikeAdded { .. }       => "likes.added",
            Self::LikeRemoved { .. }     => "likes.removed",
            Self::BoostAdded { .. }      => "boosts.added",
            Self::BoostRemoved { .. }    => "boosts.removed",
            Self::FollowRequested { .. } => "follows.requested",
            Self::FollowAccepted { .. }  => "follows.accepted",
            Self::FollowRejected { .. }  => "follows.rejected",
            Self::Unfollowed { .. }      => "follows.removed",
            Self::UserBlocked { .. }     => "users.blocked",
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn thought_created_roundtrip() {
        let p = EventPayload::ThoughtCreated {
            thought_id: "abc".into(),
            user_id: "def".into(),
            in_reply_to_id: None,
        };
        let json = serde_json::to_string(&p).unwrap();
        let back: EventPayload = serde_json::from_str(&json).unwrap();
        assert_eq!(back.subject(), "thoughts.created");
    }

    #[test]
    fn all_subjects_are_unique() {
        let samples: &[EventPayload] = &[
            EventPayload::ThoughtCreated { thought_id: "a".into(), user_id: "b".into(), in_reply_to_id: None },
            EventPayload::ThoughtDeleted { thought_id: "a".into(), user_id: "b".into() },
            EventPayload::ThoughtUpdated { thought_id: "a".into(), user_id: "b".into() },
            EventPayload::LikeAdded { like_id: "a".into(), user_id: "b".into(), thought_id: "c".into() },
            EventPayload::LikeRemoved { user_id: "b".into(), thought_id: "c".into() },
            EventPayload::BoostAdded { boost_id: "a".into(), user_id: "b".into(), thought_id: "c".into() },
            EventPayload::BoostRemoved { user_id: "b".into(), thought_id: "c".into() },
            EventPayload::FollowRequested { follower_id: "a".into(), following_id: "b".into() },
            EventPayload::FollowAccepted { follower_id: "a".into(), following_id: "b".into() },
            EventPayload::FollowRejected { follower_id: "a".into(), following_id: "b".into() },
            EventPayload::Unfollowed { follower_id: "a".into(), following_id: "b".into() },
            EventPayload::UserBlocked { blocker_id: "a".into(), blocked_id: "b".into() },
        ];
        let mut subjects: Vec<&str> = samples.iter().map(|p| p.subject()).collect();
        subjects.sort();
        subjects.dedup();
        assert_eq!(subjects.len(), samples.len(), "each event must have a unique subject");
    }
}
  • Run: cargo test -p event-payload Expected: 2 tests pass.

  • Commit:

git add Cargo.toml crates/adapters/event-payload/
git commit -m "feat(event-payload): serializable NATS event payload types"

Task 2: nats crate — NatsEventPublisher + NatsEventConsumer

Files:

  • Modify: crates/adapters/nats/Cargo.toml

  • Modify: crates/adapters/nats/src/lib.rs

  • Write crates/adapters/nats/Cargo.toml:

[package]
name = "nats"
version = "0.1.0"
edition = "2021"

[dependencies]
domain        = { workspace = true }
event-payload = { workspace = true }
async-nats    = { workspace = true }
async-stream  = { workspace = true }
serde_json    = { workspace = true }
futures       = { workspace = true }
tokio         = { workspace = true }
async-trait   = { workspace = true }
tracing       = { workspace = true }
  • Write test at bottom of crates/adapters/nats/src/lib.rs:
#[cfg(test)]
mod tests {
    use super::*;
    use domain::value_objects::{ThoughtId, UserId};

    #[test]
    fn payload_from_domain_event_has_correct_subject() {
        let event = domain::events::DomainEvent::ThoughtCreated {
            thought_id: ThoughtId::new(),
            user_id: UserId::new(),
            in_reply_to_id: None,
        };
        let payload = EventPayload::from(&event);
        assert_eq!(payload.subject(), "thoughts.created");
    }

    #[test]
    fn domain_event_roundtrip_via_payload() {
        let uid = UserId::new();
        let tid = ThoughtId::new();
        let event = domain::events::DomainEvent::LikeAdded {
            like_id: domain::value_objects::LikeId::new(),
            user_id: uid.clone(),
            thought_id: tid.clone(),
        };
        let payload = EventPayload::from(&event);
        let back = domain::events::DomainEvent::try_from(payload).unwrap();
        if let domain::events::DomainEvent::LikeAdded { user_id, thought_id, .. } = back {
            assert_eq!(user_id, uid);
            assert_eq!(thought_id, tid);
        } else {
            panic!("wrong variant");
        }
    }
}
  • Run: cargo test -p nats — Expected: FAIL (lib.rs is empty).

  • Write crates/adapters/nats/src/lib.rs:

use async_trait::async_trait;
use domain::{
    errors::DomainError,
    events::{DomainEvent, EventEnvelope},
    ports::{EventConsumer, EventPublisher},
    value_objects::{BoostId, LikeId, ThoughtId, UserId},
};
use event_payload::EventPayload;
use futures::stream::BoxStream;

// ── DomainEvent → EventPayload ─────────────────────────────────────────────

impl From<&DomainEvent> for EventPayload {
    fn from(e: &DomainEvent) -> Self {
        match e {
            DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => Self::ThoughtCreated {
                thought_id: thought_id.to_string(),
                user_id: user_id.to_string(),
                in_reply_to_id: in_reply_to_id.as_ref().map(|x| x.to_string()),
            },
            DomainEvent::ThoughtDeleted { thought_id, user_id } => Self::ThoughtDeleted {
                thought_id: thought_id.to_string(), user_id: user_id.to_string(),
            },
            DomainEvent::ThoughtUpdated { thought_id, user_id } => Self::ThoughtUpdated {
                thought_id: thought_id.to_string(), user_id: user_id.to_string(),
            },
            DomainEvent::LikeAdded { like_id, user_id, thought_id } => Self::LikeAdded {
                like_id: like_id.to_string(), user_id: user_id.to_string(), thought_id: thought_id.to_string(),
            },
            DomainEvent::LikeRemoved { user_id, thought_id } => Self::LikeRemoved {
                user_id: user_id.to_string(), thought_id: thought_id.to_string(),
            },
            DomainEvent::BoostAdded { boost_id, user_id, thought_id } => Self::BoostAdded {
                boost_id: boost_id.to_string(), user_id: user_id.to_string(), thought_id: thought_id.to_string(),
            },
            DomainEvent::BoostRemoved { user_id, thought_id } => Self::BoostRemoved {
                user_id: user_id.to_string(), thought_id: thought_id.to_string(),
            },
            DomainEvent::FollowRequested { follower_id, following_id } => Self::FollowRequested {
                follower_id: follower_id.to_string(), following_id: following_id.to_string(),
            },
            DomainEvent::FollowAccepted { follower_id, following_id } => Self::FollowAccepted {
                follower_id: follower_id.to_string(), following_id: following_id.to_string(),
            },
            DomainEvent::FollowRejected { follower_id, following_id } => Self::FollowRejected {
                follower_id: follower_id.to_string(), following_id: following_id.to_string(),
            },
            DomainEvent::Unfollowed { follower_id, following_id } => Self::Unfollowed {
                follower_id: follower_id.to_string(), following_id: following_id.to_string(),
            },
            DomainEvent::UserBlocked { blocker_id, blocked_id } => Self::UserBlocked {
                blocker_id: blocker_id.to_string(), blocked_id: blocked_id.to_string(),
            },
        }
    }
}

// ── EventPayload → DomainEvent ─────────────────────────────────────────────

fn parse_uuid(s: &str, field: &str) -> Result<uuid::Uuid, DomainError> {
    uuid::Uuid::parse_str(s)
        .map_err(|_| DomainError::Internal(format!("invalid uuid for {field}: {s}")))
}

impl TryFrom<EventPayload> for DomainEvent {
    type Error = DomainError;

    fn try_from(p: EventPayload) -> Result<Self, DomainError> {
        Ok(match p {
            EventPayload::ThoughtCreated { thought_id, user_id, in_reply_to_id } => DomainEvent::ThoughtCreated {
                thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
                user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
                in_reply_to_id: in_reply_to_id
                    .map(|s| parse_uuid(&s, "in_reply_to_id").map(ThoughtId::from_uuid))
                    .transpose()?,
            },
            EventPayload::ThoughtDeleted { thought_id, user_id } => DomainEvent::ThoughtDeleted {
                thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
                user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
            },
            EventPayload::ThoughtUpdated { thought_id, user_id } => DomainEvent::ThoughtUpdated {
                thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
                user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
            },
            EventPayload::LikeAdded { like_id, user_id, thought_id } => DomainEvent::LikeAdded {
                like_id: LikeId::from_uuid(parse_uuid(&like_id, "like_id")?),
                user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
                thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
            },
            EventPayload::LikeRemoved { user_id, thought_id } => DomainEvent::LikeRemoved {
                user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
                thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
            },
            EventPayload::BoostAdded { boost_id, user_id, thought_id } => DomainEvent::BoostAdded {
                boost_id: BoostId::from_uuid(parse_uuid(&boost_id, "boost_id")?),
                user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
                thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
            },
            EventPayload::BoostRemoved { user_id, thought_id } => DomainEvent::BoostRemoved {
                user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
                thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
            },
            EventPayload::FollowRequested { follower_id, following_id } => DomainEvent::FollowRequested {
                follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
                following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
            },
            EventPayload::FollowAccepted { follower_id, following_id } => DomainEvent::FollowAccepted {
                follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
                following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
            },
            EventPayload::FollowRejected { follower_id, following_id } => DomainEvent::FollowRejected {
                follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
                following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
            },
            EventPayload::Unfollowed { follower_id, following_id } => DomainEvent::Unfollowed {
                follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
                following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
            },
            EventPayload::UserBlocked { blocker_id, blocked_id } => DomainEvent::UserBlocked {
                blocker_id: UserId::from_uuid(parse_uuid(&blocker_id, "blocker_id")?),
                blocked_id: UserId::from_uuid(parse_uuid(&blocked_id, "blocked_id")?),
            },
        })
    }
}

// ── NatsEventPublisher ────────────────────────────────────────────────────

pub struct NatsEventPublisher {
    client: async_nats::Client,
}

impl NatsEventPublisher {
    pub fn new(client: async_nats::Client) -> Self { Self { client } }
}

#[async_trait]
impl EventPublisher for NatsEventPublisher {
    async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
        let payload = EventPayload::from(event);
        let subject = payload.subject();
        let bytes = serde_json::to_vec(&payload)
            .map_err(|e| DomainError::Internal(e.to_string()))?;
        self.client
            .publish(subject, bytes.into())
            .await
            .map_err(|e| DomainError::Internal(e.to_string()))
    }
}

// ── NatsEventConsumer ─────────────────────────────────────────────────────

pub struct NatsEventConsumer {
    client: async_nats::Client,
}

impl NatsEventConsumer {
    pub fn new(client: async_nats::Client) -> Self { Self { client } }
}

impl EventConsumer for NatsEventConsumer {
    fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
        let client = self.client.clone();
        Box::pin(async_stream::try_stream! {
            let mut sub = client
                .subscribe(">")
                .await
                .map_err(|e| DomainError::Internal(e.to_string()))?;

            use futures::StreamExt;
            while let Some(msg) = sub.next().await {
                let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
                    Ok(p) => p,
                    Err(e) => {
                        tracing::warn!("failed to deserialize event payload: {e}");
                        continue;
                    }
                };
                let event = match DomainEvent::try_from(payload) {
                    Ok(e) => e,
                    Err(e) => {
                        tracing::warn!("failed to convert payload to domain event: {e}");
                        continue;
                    }
                };
                // Basic NATS has no ack/nack — at-most-once delivery
                yield EventEnvelope {
                    event,
                    ack:  Box::new(|| {}),
                    nack: Box::new(|| {}),
                };
            }
        })
    }
}
  • Run: cargo test -p nats Expected: 2 tests pass.

  • Commit:

git add crates/adapters/nats/
git commit -m "feat(nats): NatsEventPublisher and NatsEventConsumer with payload conversion"

Task 3: worker — NotificationHandler + FederationHandler

Files:

  • Modify: crates/worker/Cargo.toml

  • Create: crates/worker/src/handlers.rs

  • Write crates/worker/Cargo.toml:

[package]
name = "worker"
version = "0.1.0"
edition = "2021"

[[bin]]
name = "thoughts-worker"
path = "src/main.rs"

[dependencies]
domain        = { workspace = true }
nats          = { workspace = true }
event-payload = { workspace = true }
postgres      = { workspace = true }
async-nats    = { workspace = true }
tokio         = { workspace = true, features = ["full"] }
futures       = { workspace = true }
async-trait   = { workspace = true }
tracing       = { workspace = true }
tracing-subscriber = { workspace = true }
dotenvy       = { workspace = true }
serde_json    = { workspace = true }
chrono        = { workspace = true }
  • Write tests at bottom of crates/worker/src/handlers.rs:
#[cfg(test)]
mod tests {
    use super::*;
    use domain::{
        models::{thought::{Thought, Visibility}, user::User},
        testing::TestStore,
        value_objects::*,
    };
    use std::sync::Arc;

    fn alice() -> User {
        User::new_local(
            UserId::new(),
            Username::new("alice").unwrap(),
            Email::new("alice@ex.com").unwrap(),
            PasswordHash("h".into()),
        )
    }

    #[tokio::test]
    async fn like_added_creates_notification_for_thought_author() {
        let store = TestStore::default();
        let alice = alice();
        let bob_id = UserId::new();

        // alice posts a thought
        let thought = Thought::new_local(
            ThoughtId::new(), alice.id.clone(),
            Content::new_local("hello").unwrap(),
            None, Visibility::Public, None, false,
        );
        store.users.lock().unwrap().push(alice.clone());
        store.thoughts.lock().unwrap().push(thought.clone());

        let handler = NotificationHandler {
            thoughts: Arc::new(store.clone()),
            notifications: Arc::new(store.clone()),
        };

        // bob likes alice's thought
        handler.handle(&DomainEvent::LikeAdded {
            like_id: LikeId::new(),
            user_id: bob_id.clone(),
            thought_id: thought.id.clone(),
        }).await.unwrap();

        let notifs = store.notifications.lock().unwrap();
        assert_eq!(notifs.len(), 1);
        assert_eq!(notifs[0].user_id, alice.id);  // notification goes to alice
        assert!(matches!(notifs[0].notification_type, domain::models::notification::NotificationType::Like));
    }

    #[tokio::test]
    async fn self_like_does_not_create_notification() {
        let store = TestStore::default();
        let alice = alice();
        let thought = Thought::new_local(
            ThoughtId::new(), alice.id.clone(),
            Content::new_local("hello").unwrap(),
            None, Visibility::Public, None, false,
        );
        store.users.lock().unwrap().push(alice.clone());
        store.thoughts.lock().unwrap().push(thought.clone());

        let handler = NotificationHandler {
            thoughts: Arc::new(store.clone()),
            notifications: Arc::new(store.clone()),
        };

        handler.handle(&DomainEvent::LikeAdded {
            like_id: LikeId::new(),
            user_id: alice.id.clone(), // alice likes her own thought
            thought_id: thought.id.clone(),
        }).await.unwrap();

        assert!(store.notifications.lock().unwrap().is_empty());
    }

    #[tokio::test]
    async fn follow_accepted_creates_notification() {
        let store = TestStore::default();
        let alice = alice();
        let bob_id = UserId::new();
        store.users.lock().unwrap().push(alice.clone());

        let handler = NotificationHandler {
            thoughts: Arc::new(store.clone()),
            notifications: Arc::new(store.clone()),
        };

        // bob follows alice (alice gets notified)
        handler.handle(&DomainEvent::FollowAccepted {
            follower_id: bob_id.clone(),
            following_id: alice.id.clone(),
        }).await.unwrap();

        let notifs = store.notifications.lock().unwrap();
        assert_eq!(notifs.len(), 1);
        assert_eq!(notifs[0].user_id, alice.id);
        assert!(matches!(notifs[0].notification_type, domain::models::notification::NotificationType::Follow));
    }
}
  • Run: cargo test -p worker — Expected: FAIL (handlers.rs doesn't exist yet).

  • Create crates/worker/src/handlers.rs:

use std::sync::Arc;
use chrono::Utc;
use domain::{
    errors::DomainError,
    events::DomainEvent,
    models::notification::{Notification, NotificationType},
    ports::{NotificationRepository, ThoughtRepository},
    value_objects::NotificationId,
};

/// Handles domain events that should create notifications for users.
pub struct NotificationHandler {
    pub thoughts:      Arc<dyn ThoughtRepository>,
    pub notifications: Arc<dyn NotificationRepository>,
}

impl NotificationHandler {
    pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
        match event {
            DomainEvent::LikeAdded { like_id: _, user_id, thought_id } => {
                let thought = match self.thoughts.find_by_id(thought_id).await? {
                    Some(t) => t,
                    None => return Ok(()), // thought deleted — skip
                };
                if thought.user_id == *user_id { return Ok(()); } // no self-notifications
                self.notifications.save(&Notification {
                    id: NotificationId::new(),
                    user_id: thought.user_id,
                    notification_type: NotificationType::Like,
                    from_user_id: Some(user_id.clone()),
                    thought_id: Some(thought_id.clone()),
                    read: false,
                    created_at: Utc::now(),
                }).await
            }
            DomainEvent::BoostAdded { boost_id: _, user_id, thought_id } => {
                let thought = match self.thoughts.find_by_id(thought_id).await? {
                    Some(t) => t,
                    None => return Ok(()),
                };
                if thought.user_id == *user_id { return Ok(()); }
                self.notifications.save(&Notification {
                    id: NotificationId::new(),
                    user_id: thought.user_id,
                    notification_type: NotificationType::Boost,
                    from_user_id: Some(user_id.clone()),
                    thought_id: Some(thought_id.clone()),
                    read: false,
                    created_at: Utc::now(),
                }).await
            }
            DomainEvent::FollowAccepted { follower_id, following_id } => {
                // The person being followed (following_id) gets notified
                self.notifications.save(&Notification {
                    id: NotificationId::new(),
                    user_id: following_id.clone(),
                    notification_type: NotificationType::Follow,
                    from_user_id: Some(follower_id.clone()),
                    thought_id: None,
                    read: false,
                    created_at: Utc::now(),
                }).await
            }
            // All other events: no notification needed in Plan 3
            _ => Ok(()),
        }
    }
}

/// Stub handler for ActivityPub federation — implemented in Plan 4.
pub struct FederationHandler;

impl FederationHandler {
    pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
        tracing::debug!(event = ?event, "federation handler (stub — Plan 4)");
        Ok(())
    }
}
  • Run: cargo test -p worker — Expected: 3 tests pass.

  • Commit:

git add crates/worker/
git commit -m "feat(worker): NotificationHandler and FederationHandler stub"

Task 4: worker main binary

Files:

  • Modify: crates/worker/src/main.rs

  • Write crates/worker/src/main.rs:

mod handlers;

use std::sync::Arc;
use futures::StreamExt;
use sqlx::PgPool;
use domain::ports::EventConsumer;

#[tokio::main]
async fn main() {
    dotenvy::dotenv().ok();
    tracing_subscriber::fmt()
        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
        .init();

    let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required");
    let nats_url     = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());

    tracing::info!("Connecting to postgres...");
    let pool = PgPool::connect(&database_url).await.expect("DB connect failed");

    tracing::info!("Connecting to NATS at {nats_url}...");
    let nats_client = async_nats::connect(&nats_url).await.expect("NATS connect failed");
    let consumer = nats::NatsEventConsumer::new(nats_client);

    let notification_handler = handlers::NotificationHandler {
        thoughts:      Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())),
        notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())),
    };
    let federation_handler = handlers::FederationHandler;

    tracing::info!("Worker started, consuming events...");

    let mut stream = consumer.consume();
    while let Some(result) = stream.next().await {
        match result {
            Ok(envelope) => {
                let event = &envelope.event;
                tracing::debug!(subject = ?event, "received event");

                let n_result = notification_handler.handle(event).await;
                let f_result = federation_handler.handle(event).await;

                if n_result.is_ok() && f_result.is_ok() {
                    (envelope.ack)();
                } else {
                    if let Err(e) = n_result { tracing::error!("notification handler error: {e}"); }
                    if let Err(e) = f_result { tracing::error!("federation handler error: {e}"); }
                    (envelope.nack)();
                }
            }
            Err(e) => {
                tracing::error!("consumer error: {e}");
            }
        }
    }
}
  • Run: cargo build -p worker Expected: compiles cleanly (binary thoughts-worker produced).

  • Smoke test (requires NATS running):

# Terminal 1: start NATS if not already running
docker run -d --name nats -p 4222:4222 nats:latest || true

# Terminal 2: start worker
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres \
RUST_LOG=info \
cargo run --bin thoughts-worker &
sleep 2

# Terminal 3: start API server
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres JWT_SECRET=dev cargo run -p presentation &
sleep 2

# Create a user, post a thought, like it — check that worker logs "received event"
TOKEN=$(curl -s -X POST http://localhost:3000/auth/register \
  -H 'content-type: application/json' \
  -d '{"username":"evttest","email":"evt@test.com","password":"pw"}' | jq -r .token)

TID=$(curl -s -X POST http://localhost:3000/thoughts \
  -H 'content-type: application/json' \
  -H "Authorization: Bearer $TOKEN" \
  -d '{"content":"event test"}' | jq -r .id)

curl -s -X POST http://localhost:3000/thoughts/$TID/like \
  -H "Authorization: Bearer $TOKEN"

kill %1 %2 2>/dev/null

Expected: worker logs show received event for the like. No errors.

  • Commit:
git add crates/worker/src/main.rs
git commit -m "feat(worker): consumer loop binary connecting NATS to handlers"

Task 5: Presentation — swap NoOp for real NatsEventPublisher

Files:

  • Modify: crates/presentation/Cargo.toml
  • Modify: crates/presentation/src/lib.rs
  • Modify: crates/presentation/src/main.rs

When NATS_URL is not set, fall back to the NoOpEventPublisher so the API still starts without NATS. Use an env var NATS_URL — if set, use real publisher; if absent, log a warning and use no-op.

  • Add nats to crates/presentation/Cargo.toml deps:
nats       = { workspace = true }
async-nats = { workspace = true }
  • Update crates/presentation/src/lib.rs — replace the NoOpEventPublisher struct and build_state function with one that optionally connects to NATS:

Replace the existing build_state signature with an async version:

use std::sync::Arc;
use sqlx::PgPool;
use state::AppState;
use async_trait::async_trait;
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};

pub mod errors;
pub mod extractors;
pub mod handlers;
pub mod routes;
pub mod state;

use postgres_search::PgSearchRepository;

struct NoOpEventPublisher;
#[async_trait]
impl EventPublisher for NoOpEventPublisher {
    async fn publish(&self, _e: &DomainEvent) -> Result<(), DomainError> { Ok(()) }
}

pub async fn build_state(pool: PgPool, jwt_secret: String) -> AppState {
    let event_publisher: Arc<dyn EventPublisher> = match std::env::var("NATS_URL") {
        Ok(url) => {
            match async_nats::connect(&url).await {
                Ok(client) => {
                    tracing::info!("Connected to NATS at {url}");
                    Arc::new(nats::NatsEventPublisher::new(client))
                }
                Err(e) => {
                    tracing::warn!("Failed to connect to NATS at {url}: {e} — using no-op publisher");
                    Arc::new(NoOpEventPublisher)
                }
            }
        }
        Err(_) => {
            tracing::info!("NATS_URL not set — using no-op event publisher");
            Arc::new(NoOpEventPublisher)
        }
    };

    AppState {
        users:         Arc::new(postgres::user::PgUserRepository::new(pool.clone())),
        thoughts:      Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())),
        likes:         Arc::new(postgres::like::PgLikeRepository::new(pool.clone())),
        boosts:        Arc::new(postgres::boost::PgBoostRepository::new(pool.clone())),
        follows:       Arc::new(postgres::follow::PgFollowRepository::new(pool.clone())),
        blocks:        Arc::new(postgres::block::PgBlockRepository::new(pool.clone())),
        tags:          Arc::new(postgres::tag::PgTagRepository::new(pool.clone())),
        api_keys:      Arc::new(postgres::api_key::PgApiKeyRepository::new(pool.clone())),
        top_friends:   Arc::new(postgres::top_friend::PgTopFriendRepository::new(pool.clone())),
        notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())),
        remote_actors: Arc::new(postgres::remote_actor::PgRemoteActorRepository::new(pool.clone())),
        feed:          Arc::new(postgres::feed::PgFeedRepository::new(pool.clone())),
        search:        Arc::new(PgSearchRepository::new(pool.clone())),
        auth:          Arc::new(auth::JwtAuthService::new(jwt_secret, 86400 * 30)),
        hasher:        Arc::new(auth::Argon2PasswordHasher),
        events:        event_publisher,
    }
}
  • Update crates/presentation/src/main.rsbuild_state is now async, so await it:
use sqlx::PgPool;
use tower_http::cors::CorsLayer;
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() {
    dotenvy::dotenv().ok();
    tracing_subscriber::fmt()
        .with_env_filter(EnvFilter::from_default_env())
        .init();

    let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required");
    let jwt_secret   = std::env::var("JWT_SECRET").expect("JWT_SECRET required");
    let port         = std::env::var("PORT").unwrap_or_else(|_| "3000".into());

    let pool = PgPool::connect(&database_url).await.expect("DB connect failed");
    sqlx::migrate!("../adapters/postgres/migrations").run(&pool).await.expect("Migrations failed");

    let state = presentation::build_state(pool, jwt_secret).await; // note: .await
    let app = presentation::routes::router()
        .with_state(state)
        .layer(CorsLayer::permissive());

    let addr = format!("0.0.0.0:{port}");
    tracing::info!("Listening on {addr}");
    let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
    axum::serve(listener, app).await.unwrap();
}
  • Run: cargo build -p presentation Expected: clean build.

  • Verify no-op fallback works (without NATS running):

DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres JWT_SECRET=dev \
RUST_LOG=info cargo run -p presentation &
sleep 2
# Should log: "NATS_URL not set — using no-op event publisher"
curl -s -X POST http://localhost:3000/auth/register \
  -H 'content-type: application/json' \
  -d '{"username":"natstest","email":"nats@test.com","password":"pw"}' | jq .token
kill %1
  • Run full test suite:
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace

Expected: all tests pass (52 + new worker tests = 55+).

  • Commit:
git add crates/presentation/
git commit -m "feat(presentation): NatsEventPublisher with no-op fallback when NATS_URL unset"

Self-Review

Spec coverage:

  • event-payload: serializable EventPayload enum, subject(), From/TryFrom conversions (Task 1)
  • nats: NatsEventPublisher implementing EventPublisher (Task 2)
  • nats: NatsEventConsumer implementing EventConsumer via BoxStream (Task 2)
  • worker: NotificationHandler (LikeAdded, BoostAdded, FollowAccepted → notifications) (Task 3)
  • worker: FederationHandler stub (Task 3)
  • worker: consumer loop binary (Task 4)
  • presentation: real NATS publisher with graceful no-op fallback (Task 5)
  • event-publisher: stays as stub (correct — deferred per plan)

Placeholder scan: None — all code blocks complete.

Type consistency:

  • NatsEventPublisher::new(client: async_nats::Client) — matches usage in presentation lib.rs and worker main.rs
  • NatsEventConsumer::new(client: async_nats::Client) — matches worker main.rs
  • NotificationHandler { thoughts, notifications } — field names match handler usage in main.rs
  • build_state is now async fn — main.rs correctly awaits it
  • EventPayload::from(&DomainEvent) — implemented in nats crate (which sees both types)

Notes:

  • Basic NATS (at-most-once delivery) is used — JetStream (exactly-once) deferred to later
  • Worker Cargo.toml includes postgres internal crate for database access in handlers
  • crates/adapters/nats has Rust module name nats but package name nats — import as use nats::...