diff --git a/.claude/worktrees/arch-refactors b/.claude/worktrees/arch-refactors new file mode 160000 index 0000000..e70a161 --- /dev/null +++ b/.claude/worktrees/arch-refactors @@ -0,0 +1 @@ +Subproject commit e70a1610bc8920e0c39cf10a68dbb5ae06a9010d diff --git a/Cargo.lock b/Cargo.lock index 52c460b..9bb70d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2450,6 +2450,8 @@ dependencies = [ "async-trait", "chrono", "domain", + "event-payload", + "serde_json", "sqlx", "thiserror 2.0.18", "tokio", @@ -4713,11 +4715,13 @@ dependencies = [ "async-nats", "domain", "dotenvy", + "event-payload", "event-transport", "futures", "nats", "postgres", "postgres-federation", + "serde_json", "sqlx", "tokio", "tracing", diff --git a/crates/adapters/postgres/Cargo.toml b/crates/adapters/postgres/Cargo.toml index f30f243..ae4ee47 100644 --- a/crates/adapters/postgres/Cargo.toml +++ b/crates/adapters/postgres/Cargo.toml @@ -4,8 +4,9 @@ version = "0.1.0" edition = "2021" [dependencies] -domain = { workspace = true } -sqlx = { workspace = true } +domain = { workspace = true } +event-payload = { workspace = true } +sqlx = { workspace = true } uuid = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } diff --git a/crates/adapters/postgres/migrations/011_outbox_events.sql b/crates/adapters/postgres/migrations/011_outbox_events.sql new file mode 100644 index 0000000..8f7e397 --- /dev/null +++ b/crates/adapters/postgres/migrations/011_outbox_events.sql @@ -0,0 +1,10 @@ +CREATE TABLE outbox_events ( + seq BIGSERIAL PRIMARY KEY, + aggregate_id UUID NOT NULL, + event_type TEXT NOT NULL, + payload JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + delivered BOOLEAN NOT NULL DEFAULT false, + delivered_at TIMESTAMPTZ +); +CREATE INDEX outbox_events_pending_idx ON outbox_events (seq) WHERE delivered = false; diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index 29e2f1c..26c8139 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -4,6 +4,7 @@ pub mod block; pub mod boost; mod db_error; pub mod failed_event; +pub mod outbox; pub mod feed; pub mod follow; pub mod like; diff --git a/crates/adapters/postgres/src/outbox.rs b/crates/adapters/postgres/src/outbox.rs new file mode 100644 index 0000000..a24f6bb --- /dev/null +++ b/crates/adapters/postgres/src/outbox.rs @@ -0,0 +1,61 @@ +use async_trait::async_trait; +use domain::{errors::DomainError, events::DomainEvent, ports::OutboxWriter}; +use event_payload::EventPayload; +use sqlx::PgPool; +use uuid::Uuid; + +pub struct PgOutboxWriter { + pool: PgPool, +} + +impl PgOutboxWriter { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +/// Primary aggregate UUID for an event — used to populate `aggregate_id`. +fn aggregate_id(event: &DomainEvent) -> Uuid { + match event { + DomainEvent::ThoughtCreated { thought_id, .. } => thought_id.as_uuid(), + DomainEvent::ThoughtDeleted { thought_id, .. } => thought_id.as_uuid(), + DomainEvent::ThoughtUpdated { thought_id, .. } => thought_id.as_uuid(), + DomainEvent::LikeAdded { thought_id, .. } => thought_id.as_uuid(), + DomainEvent::LikeRemoved { thought_id, .. } => thought_id.as_uuid(), + DomainEvent::BoostAdded { thought_id, .. } => thought_id.as_uuid(), + DomainEvent::BoostRemoved { thought_id, .. } => thought_id.as_uuid(), + DomainEvent::FollowRequested { follower_id, .. } => follower_id.as_uuid(), + DomainEvent::FollowAccepted { follower_id, .. } => follower_id.as_uuid(), + DomainEvent::FollowRejected { follower_id, .. } => follower_id.as_uuid(), + DomainEvent::Unfollowed { follower_id, .. } => follower_id.as_uuid(), + DomainEvent::UserBlocked { blocker_id, .. } => blocker_id.as_uuid(), + DomainEvent::UserUnblocked { blocker_id, .. } => blocker_id.as_uuid(), + DomainEvent::UserRegistered { user_id } => user_id.as_uuid(), + DomainEvent::ProfileUpdated { user_id } => user_id.as_uuid(), + DomainEvent::MentionReceived { thought_id, .. } => thought_id.as_uuid(), + } +} + +#[async_trait] +impl OutboxWriter for PgOutboxWriter { + async fn append(&self, event: &DomainEvent) -> Result<(), DomainError> { + let payload = EventPayload::from(event); + let event_type = payload.subject(); + let payload_json = + serde_json::to_value(&payload).map_err(|e| DomainError::Internal(e.to_string()))?; + let agg_id = aggregate_id(event); + + sqlx::query( + "INSERT INTO outbox_events (aggregate_id, event_type, payload) \ + VALUES ($1, $2, $3)", + ) + .bind(agg_id) + .bind(event_type) + .bind(payload_json) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(()) + } +} diff --git a/crates/application/src/use_cases/thoughts.rs b/crates/application/src/use_cases/thoughts.rs index f9838b9..e72a695 100644 --- a/crates/application/src/use_cases/thoughts.rs +++ b/crates/application/src/use_cases/thoughts.rs @@ -2,7 +2,7 @@ use domain::{ errors::DomainError, events::DomainEvent, models::thought::{Thought, Visibility}, - ports::{EventPublisher, TagRepository, ThoughtRepository, UserReader}, + ports::{EventPublisher, OutboxWriter, TagRepository, ThoughtRepository, UserReader}, value_objects::{Content, ThoughtId, UserId}, }; @@ -53,7 +53,8 @@ pub async fn create_thought( thoughts: &dyn ThoughtRepository, _users: &dyn UserReader, tags: &dyn TagRepository, - events: &dyn EventPublisher, + _events: &dyn EventPublisher, + outbox: &dyn OutboxWriter, input: CreateThoughtInput, ) -> Result { let content = Content::new_local(input.content)?; @@ -81,8 +82,8 @@ pub async fn create_thought( } } - events - .publish(&DomainEvent::ThoughtCreated { + outbox + .append(&DomainEvent::ThoughtCreated { thought_id: thought.id.clone(), user_id: thought.user_id.clone(), in_reply_to_id: input.in_reply_to_id, @@ -93,7 +94,8 @@ pub async fn create_thought( pub async fn delete_thought( thoughts: &dyn ThoughtRepository, - events: &dyn EventPublisher, + _events: &dyn EventPublisher, + outbox: &dyn OutboxWriter, id: &ThoughtId, user_id: &UserId, ) -> Result<(), DomainError> { @@ -103,8 +105,8 @@ pub async fn delete_thought( .ok_or(DomainError::NotFound)?; require_owner(&thought, user_id)?; thoughts.delete(id, user_id).await?; - events - .publish(&DomainEvent::ThoughtDeleted { + outbox + .append(&DomainEvent::ThoughtDeleted { thought_id: id.clone(), user_id: user_id.clone(), }) @@ -154,7 +156,7 @@ mod tests { use super::*; use domain::{ models::user::User, - testing::{NoOpEventPublisher, TestStore}, + testing::{NoOpEventPublisher, NoOpOutboxWriter, TestOutbox, TestStore}, value_objects::*, }; @@ -179,15 +181,18 @@ mod tests { } #[tokio::test] - async fn create_thought_saves_and_emits_event() { + async fn create_thought_saves_and_stages_outbox_event() { let store = TestStore::default(); + let outbox = TestOutbox::default(); let u = user(); store.users.lock().unwrap().push(u.clone()); - let out = create_thought(&store, &store, &store, &store, input(u.id.clone())) + let out = create_thought(&store, &store, &store, &NoOpEventPublisher, &outbox, input(u.id.clone())) .await .unwrap(); assert_eq!(out.thought.content.as_str(), "hello"); - assert_eq!(store.events.lock().unwrap().len(), 1); + let staged = outbox.staged(); + assert_eq!(staged.len(), 1); + assert!(matches!(staged[0], DomainEvent::ThoughtCreated { .. })); } #[tokio::test] @@ -200,11 +205,12 @@ mod tests { &store, &store, &NoOpEventPublisher, + &NoOpOutboxWriter, input(u.id.clone()), ) .await .unwrap(); - delete_thought(&store, &NoOpEventPublisher, &out.thought.id, &u.id) + delete_thought(&store, &NoOpEventPublisher, &NoOpOutboxWriter, &out.thought.id, &u.id) .await .unwrap(); assert!(store.thoughts.lock().unwrap().is_empty()); @@ -230,11 +236,12 @@ mod tests { &store, &store, &NoOpEventPublisher, + &NoOpOutboxWriter, input(alice.id.clone()), ) .await .unwrap(); - let err = delete_thought(&store, &NoOpEventPublisher, &out.thought.id, &bob.id) + let err = delete_thought(&store, &NoOpEventPublisher, &NoOpOutboxWriter, &out.thought.id, &bob.id) .await .unwrap_err(); assert!(matches!(err, DomainError::NotFound)); @@ -245,7 +252,7 @@ mod tests { let store = TestStore::default(); let alice = user(); store.users.lock().unwrap().push(alice.clone()); - let out = create_thought(&store, &store, &store, &store, input(alice.id.clone())) + let out = create_thought(&store, &store, &store, &NoOpEventPublisher, &NoOpOutboxWriter, input(alice.id.clone())) .await .unwrap(); let tid = out.thought.id.clone(); @@ -280,6 +287,7 @@ mod tests { &store, &store, &NoOpEventPublisher, + &NoOpOutboxWriter, input(alice.id.clone()), ) .await @@ -291,6 +299,7 @@ mod tests { &store, &store, &NoOpEventPublisher, + &NoOpOutboxWriter, CreateThoughtInput { user_id: alice.id.clone(), content: "reply".into(), diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index 7207695..a0e7fe3 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -7,10 +7,11 @@ use std::sync::Arc; use activitypub::ThoughtsObjectHandler; use activitypub_base::service::ActivityPubService; -use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; +use domain::{errors::DomainError, events::DomainEvent, ports::{EventPublisher, OutboxWriter}}; use event_transport::EventPublisherAdapter; use nats::NatsTransport; use postgres::activitypub::PgActivityPubRepository; +use postgres::outbox::PgOutboxWriter; use postgres::remote_actor_connections::PgRemoteActorConnectionRepository; use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; use presentation::state::AppState; @@ -120,6 +121,7 @@ pub async fn build(cfg: &Config) -> Infrastructure { }), hasher: Arc::new(auth::Argon2PasswordHasher), events: event_publisher, + outbox: Arc::new(PgOutboxWriter::new(pool.clone())) as Arc, federation: ap_service.clone() as Arc, ap_repo: Arc::new(PgActivityPubRepository::new(pool.clone())), remote_actor_connections: Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())), diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index 61b58e1..d5bac18 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -44,6 +44,11 @@ pub trait EventConsumer: Send + Sync { fn consume(&self) -> futures::stream::BoxStream<'_, Result>; } +#[async_trait] +pub trait OutboxWriter: Send + Sync { + async fn append(&self, event: &DomainEvent) -> Result<(), DomainError>; +} + #[async_trait] pub trait UserReader: Send + Sync { async fn find_by_id(&self, id: &UserId) -> Result, DomainError>; diff --git a/crates/domain/src/testing.rs b/crates/domain/src/testing.rs index 57a9686..ff2c0f8 100644 --- a/crates/domain/src/testing.rs +++ b/crates/domain/src/testing.rs @@ -937,6 +937,33 @@ impl EventPublisher for NoOpEventPublisher { } } +#[derive(Default, Clone)] +pub struct TestOutbox { + pub entries: Arc>>, +} + +impl TestOutbox { + pub fn staged(&self) -> Vec { + self.entries.lock().unwrap().clone() + } +} + +#[async_trait] +impl OutboxWriter for TestOutbox { + async fn append(&self, event: &DomainEvent) -> Result<(), DomainError> { + self.entries.lock().unwrap().push(event.clone()); + Ok(()) + } +} + +pub struct NoOpOutboxWriter; +#[async_trait] +impl OutboxWriter for NoOpOutboxWriter { + async fn append(&self, _e: &DomainEvent) -> Result<(), DomainError> { + Ok(()) + } +} + #[cfg(test)] mod ap_repo_tests { use super::*; diff --git a/crates/presentation/src/handlers/thoughts.rs b/crates/presentation/src/handlers/thoughts.rs index 73085a7..87c5b9c 100644 --- a/crates/presentation/src/handlers/thoughts.rs +++ b/crates/presentation/src/handlers/thoughts.rs @@ -66,6 +66,7 @@ pub async fn post_thought( &*s.users, &*s.tags, &*s.events, + &*s.outbox, CreateThoughtInput { user_id: uid.clone(), content: body.content, @@ -124,7 +125,7 @@ pub async fn delete_thought_handler( AuthUser(uid): AuthUser, Path(id): Path, ) -> Result { - delete_thought(&*s.thoughts, &*s.events, &ThoughtId::from_uuid(id), &uid).await?; + delete_thought(&*s.thoughts, &*s.events, &*s.outbox, &ThoughtId::from_uuid(id), &uid).await?; Ok(StatusCode::NO_CONTENT) } diff --git a/crates/presentation/src/state.rs b/crates/presentation/src/state.rs index 2242b84..7f77a91 100644 --- a/crates/presentation/src/state.rs +++ b/crates/presentation/src/state.rs @@ -19,6 +19,7 @@ pub struct AppState { pub auth: Arc, pub hasher: Arc, pub events: Arc, + pub outbox: Arc, pub federation: Arc, pub ap_repo: Arc, pub remote_actor_connections: Arc, diff --git a/crates/presentation/src/testing.rs b/crates/presentation/src/testing.rs index dd2d344..d7401e5 100644 --- a/crates/presentation/src/testing.rs +++ b/crates/presentation/src/testing.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use domain::{ errors::DomainError, ports::{AuthService, GeneratedToken, PasswordHasher}, - testing::TestStore, + testing::{NoOpOutboxWriter, TestStore}, value_objects::{PasswordHash, UserId}, }; use std::sync::Arc; @@ -48,6 +48,7 @@ pub fn make_state() -> AppState { auth: Arc::new(NoOpAuth), hasher: Arc::new(NoOpHasher), events: store.clone(), + outbox: Arc::new(NoOpOutboxWriter), federation: store.clone(), ap_repo: store.clone(), remote_actor_connections: store.clone(), diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs index 12da308..f209a78 100644 --- a/crates/worker/src/factory.rs +++ b/crates/worker/src/factory.rs @@ -17,6 +17,7 @@ pub struct WorkerHandlers { } pub struct WorkerInfra { + pub pool: PgPool, pub consumer: event_transport::EventConsumerAdapter, pub handlers: WorkerHandlers, pub dlq_store: Arc, @@ -85,7 +86,7 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker }; // DLQ store - let dlq_store = Arc::new(PgFailedEventStore::new(pool)); + let dlq_store = Arc::new(PgFailedEventStore::new(pool.clone())); // NATS consumer + publisher let nats_client = async_nats::connect(nats_url) @@ -102,6 +103,7 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker ); WorkerInfra { + pool, consumer, handlers, dlq_store, diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index febe52f..c5caa91 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -1,6 +1,7 @@ mod dlq; mod factory; mod handlers; +mod outbox_relay; use domain::ports::EventConsumer; use futures::StreamExt; @@ -26,6 +27,16 @@ async fn main() { infra.event_publisher.clone(), )); + // Spawn outbox relay — polls DB for undelivered events and publishes them. + tokio::spawn( + outbox_relay::OutboxRelay { + pool: infra.pool.clone(), + publisher: infra.event_publisher.clone(), + poll_interval: std::time::Duration::from_secs(5), + } + .run(), + ); + tracing::info!("Worker started, consuming events..."); let mut stream = infra.consumer.consume(); while let Some(result) = stream.next().await { diff --git a/crates/worker/src/outbox_relay.rs b/crates/worker/src/outbox_relay.rs new file mode 100644 index 0000000..ada92d6 --- /dev/null +++ b/crates/worker/src/outbox_relay.rs @@ -0,0 +1,88 @@ +use domain::{events::DomainEvent, ports::EventPublisher}; +use event_payload::EventPayload; +use sqlx::PgPool; +use std::sync::Arc; +use std::time::Duration; + +pub struct OutboxRelay { + pub pool: PgPool, + pub publisher: Arc, + pub poll_interval: Duration, +} + +#[derive(sqlx::FromRow)] +struct OutboxRow { + seq: i64, + event_type: String, + payload: serde_json::Value, +} + +impl OutboxRelay { + pub async fn run(self) { + loop { + if let Err(e) = self.process_batch().await { + tracing::error!("outbox relay error: {e}"); + } + tokio::time::sleep(self.poll_interval).await; + } + } + + async fn process_batch(&self) -> Result<(), sqlx::Error> { + let rows = sqlx::query_as::<_, OutboxRow>( + "SELECT seq, event_type, payload \ + FROM outbox_events \ + WHERE delivered = false \ + ORDER BY seq ASC \ + LIMIT 100 \ + FOR UPDATE SKIP LOCKED", + ) + .fetch_all(&self.pool) + .await?; + + for row in rows { + let payload: EventPayload = match serde_json::from_value(row.payload.clone()) { + Ok(p) => p, + Err(e) => { + tracing::error!(seq = row.seq, event_type = row.event_type, "outbox: failed to deserialize payload: {e}"); + // Mark delivered to avoid blocking; investigate manually. + self.mark_delivered(row.seq).await?; + continue; + } + }; + + let domain_event = match DomainEvent::try_from(payload) { + Ok(ev) => ev, + Err(e) => { + tracing::error!(seq = row.seq, "outbox: failed to convert to DomainEvent: {e}"); + self.mark_delivered(row.seq).await?; + continue; + } + }; + + match self.publisher.publish(&domain_event).await { + Ok(()) => { + self.mark_delivered(row.seq).await?; + tracing::debug!(seq = row.seq, event_type = row.event_type, "outbox: delivered"); + } + Err(e) => { + tracing::warn!(seq = row.seq, "outbox: publish failed (will retry): {e}"); + // Leave delivered=false — will be retried next poll. + } + } + } + + Ok(()) + } + + async fn mark_delivered(&self, seq: i64) -> Result<(), sqlx::Error> { + sqlx::query( + "UPDATE outbox_events \ + SET delivered = true, delivered_at = now() \ + WHERE seq = $1", + ) + .bind(seq) + .execute(&self.pool) + .await?; + Ok(()) + } +}