feat(infra): transactional outbox — OutboxWriter port, PgOutboxWriter, OutboxRelay, TestOutbox; update create_thought + delete_thought
This commit is contained in:
1
.claude/worktrees/arch-refactors
Submodule
1
.claude/worktrees/arch-refactors
Submodule
Submodule .claude/worktrees/arch-refactors added at e70a1610bc
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -2450,6 +2450,8 @@ dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"domain",
|
||||
"event-payload",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
@@ -4713,11 +4715,13 @@ dependencies = [
|
||||
"async-nats",
|
||||
"domain",
|
||||
"dotenvy",
|
||||
"event-payload",
|
||||
"event-transport",
|
||||
"futures",
|
||||
"nats",
|
||||
"postgres",
|
||||
"postgres-federation",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"tokio",
|
||||
"tracing",
|
||||
|
||||
@@ -5,6 +5,7 @@ edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
domain = { workspace = true }
|
||||
event-payload = { workspace = true }
|
||||
sqlx = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
|
||||
10
crates/adapters/postgres/migrations/011_outbox_events.sql
Normal file
10
crates/adapters/postgres/migrations/011_outbox_events.sql
Normal file
@@ -0,0 +1,10 @@
|
||||
CREATE TABLE outbox_events (
|
||||
seq BIGSERIAL PRIMARY KEY,
|
||||
aggregate_id UUID NOT NULL,
|
||||
event_type TEXT NOT NULL,
|
||||
payload JSONB NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
delivered BOOLEAN NOT NULL DEFAULT false,
|
||||
delivered_at TIMESTAMPTZ
|
||||
);
|
||||
CREATE INDEX outbox_events_pending_idx ON outbox_events (seq) WHERE delivered = false;
|
||||
@@ -4,6 +4,7 @@ pub mod block;
|
||||
pub mod boost;
|
||||
mod db_error;
|
||||
pub mod failed_event;
|
||||
pub mod outbox;
|
||||
pub mod feed;
|
||||
pub mod follow;
|
||||
pub mod like;
|
||||
|
||||
61
crates/adapters/postgres/src/outbox.rs
Normal file
61
crates/adapters/postgres/src/outbox.rs
Normal file
@@ -0,0 +1,61 @@
|
||||
use async_trait::async_trait;
|
||||
use domain::{errors::DomainError, events::DomainEvent, ports::OutboxWriter};
|
||||
use event_payload::EventPayload;
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub struct PgOutboxWriter {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PgOutboxWriter {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
/// Primary aggregate UUID for an event — used to populate `aggregate_id`.
|
||||
fn aggregate_id(event: &DomainEvent) -> Uuid {
|
||||
match event {
|
||||
DomainEvent::ThoughtCreated { thought_id, .. } => thought_id.as_uuid(),
|
||||
DomainEvent::ThoughtDeleted { thought_id, .. } => thought_id.as_uuid(),
|
||||
DomainEvent::ThoughtUpdated { thought_id, .. } => thought_id.as_uuid(),
|
||||
DomainEvent::LikeAdded { thought_id, .. } => thought_id.as_uuid(),
|
||||
DomainEvent::LikeRemoved { thought_id, .. } => thought_id.as_uuid(),
|
||||
DomainEvent::BoostAdded { thought_id, .. } => thought_id.as_uuid(),
|
||||
DomainEvent::BoostRemoved { thought_id, .. } => thought_id.as_uuid(),
|
||||
DomainEvent::FollowRequested { follower_id, .. } => follower_id.as_uuid(),
|
||||
DomainEvent::FollowAccepted { follower_id, .. } => follower_id.as_uuid(),
|
||||
DomainEvent::FollowRejected { follower_id, .. } => follower_id.as_uuid(),
|
||||
DomainEvent::Unfollowed { follower_id, .. } => follower_id.as_uuid(),
|
||||
DomainEvent::UserBlocked { blocker_id, .. } => blocker_id.as_uuid(),
|
||||
DomainEvent::UserUnblocked { blocker_id, .. } => blocker_id.as_uuid(),
|
||||
DomainEvent::UserRegistered { user_id } => user_id.as_uuid(),
|
||||
DomainEvent::ProfileUpdated { user_id } => user_id.as_uuid(),
|
||||
DomainEvent::MentionReceived { thought_id, .. } => thought_id.as_uuid(),
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl OutboxWriter for PgOutboxWriter {
|
||||
async fn append(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
let payload = EventPayload::from(event);
|
||||
let event_type = payload.subject();
|
||||
let payload_json =
|
||||
serde_json::to_value(&payload).map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
let agg_id = aggregate_id(event);
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO outbox_events (aggregate_id, event_type, payload) \
|
||||
VALUES ($1, $2, $3)",
|
||||
)
|
||||
.bind(agg_id)
|
||||
.bind(event_type)
|
||||
.bind(payload_json)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,7 @@ use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
models::thought::{Thought, Visibility},
|
||||
ports::{EventPublisher, TagRepository, ThoughtRepository, UserReader},
|
||||
ports::{EventPublisher, OutboxWriter, TagRepository, ThoughtRepository, UserReader},
|
||||
value_objects::{Content, ThoughtId, UserId},
|
||||
};
|
||||
|
||||
@@ -53,7 +53,8 @@ pub async fn create_thought(
|
||||
thoughts: &dyn ThoughtRepository,
|
||||
_users: &dyn UserReader,
|
||||
tags: &dyn TagRepository,
|
||||
events: &dyn EventPublisher,
|
||||
_events: &dyn EventPublisher,
|
||||
outbox: &dyn OutboxWriter,
|
||||
input: CreateThoughtInput,
|
||||
) -> Result<CreateThoughtOutput, DomainError> {
|
||||
let content = Content::new_local(input.content)?;
|
||||
@@ -81,8 +82,8 @@ pub async fn create_thought(
|
||||
}
|
||||
}
|
||||
|
||||
events
|
||||
.publish(&DomainEvent::ThoughtCreated {
|
||||
outbox
|
||||
.append(&DomainEvent::ThoughtCreated {
|
||||
thought_id: thought.id.clone(),
|
||||
user_id: thought.user_id.clone(),
|
||||
in_reply_to_id: input.in_reply_to_id,
|
||||
@@ -93,7 +94,8 @@ pub async fn create_thought(
|
||||
|
||||
pub async fn delete_thought(
|
||||
thoughts: &dyn ThoughtRepository,
|
||||
events: &dyn EventPublisher,
|
||||
_events: &dyn EventPublisher,
|
||||
outbox: &dyn OutboxWriter,
|
||||
id: &ThoughtId,
|
||||
user_id: &UserId,
|
||||
) -> Result<(), DomainError> {
|
||||
@@ -103,8 +105,8 @@ pub async fn delete_thought(
|
||||
.ok_or(DomainError::NotFound)?;
|
||||
require_owner(&thought, user_id)?;
|
||||
thoughts.delete(id, user_id).await?;
|
||||
events
|
||||
.publish(&DomainEvent::ThoughtDeleted {
|
||||
outbox
|
||||
.append(&DomainEvent::ThoughtDeleted {
|
||||
thought_id: id.clone(),
|
||||
user_id: user_id.clone(),
|
||||
})
|
||||
@@ -154,7 +156,7 @@ mod tests {
|
||||
use super::*;
|
||||
use domain::{
|
||||
models::user::User,
|
||||
testing::{NoOpEventPublisher, TestStore},
|
||||
testing::{NoOpEventPublisher, NoOpOutboxWriter, TestOutbox, TestStore},
|
||||
value_objects::*,
|
||||
};
|
||||
|
||||
@@ -179,15 +181,18 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn create_thought_saves_and_emits_event() {
|
||||
async fn create_thought_saves_and_stages_outbox_event() {
|
||||
let store = TestStore::default();
|
||||
let outbox = TestOutbox::default();
|
||||
let u = user();
|
||||
store.users.lock().unwrap().push(u.clone());
|
||||
let out = create_thought(&store, &store, &store, &store, input(u.id.clone()))
|
||||
let out = create_thought(&store, &store, &store, &NoOpEventPublisher, &outbox, input(u.id.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(out.thought.content.as_str(), "hello");
|
||||
assert_eq!(store.events.lock().unwrap().len(), 1);
|
||||
let staged = outbox.staged();
|
||||
assert_eq!(staged.len(), 1);
|
||||
assert!(matches!(staged[0], DomainEvent::ThoughtCreated { .. }));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -200,11 +205,12 @@ mod tests {
|
||||
&store,
|
||||
&store,
|
||||
&NoOpEventPublisher,
|
||||
&NoOpOutboxWriter,
|
||||
input(u.id.clone()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
delete_thought(&store, &NoOpEventPublisher, &out.thought.id, &u.id)
|
||||
delete_thought(&store, &NoOpEventPublisher, &NoOpOutboxWriter, &out.thought.id, &u.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(store.thoughts.lock().unwrap().is_empty());
|
||||
@@ -230,11 +236,12 @@ mod tests {
|
||||
&store,
|
||||
&store,
|
||||
&NoOpEventPublisher,
|
||||
&NoOpOutboxWriter,
|
||||
input(alice.id.clone()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let err = delete_thought(&store, &NoOpEventPublisher, &out.thought.id, &bob.id)
|
||||
let err = delete_thought(&store, &NoOpEventPublisher, &NoOpOutboxWriter, &out.thought.id, &bob.id)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, DomainError::NotFound));
|
||||
@@ -245,7 +252,7 @@ mod tests {
|
||||
let store = TestStore::default();
|
||||
let alice = user();
|
||||
store.users.lock().unwrap().push(alice.clone());
|
||||
let out = create_thought(&store, &store, &store, &store, input(alice.id.clone()))
|
||||
let out = create_thought(&store, &store, &store, &NoOpEventPublisher, &NoOpOutboxWriter, input(alice.id.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
let tid = out.thought.id.clone();
|
||||
@@ -280,6 +287,7 @@ mod tests {
|
||||
&store,
|
||||
&store,
|
||||
&NoOpEventPublisher,
|
||||
&NoOpOutboxWriter,
|
||||
input(alice.id.clone()),
|
||||
)
|
||||
.await
|
||||
@@ -291,6 +299,7 @@ mod tests {
|
||||
&store,
|
||||
&store,
|
||||
&NoOpEventPublisher,
|
||||
&NoOpOutboxWriter,
|
||||
CreateThoughtInput {
|
||||
user_id: alice.id.clone(),
|
||||
content: "reply".into(),
|
||||
|
||||
@@ -7,10 +7,11 @@ use std::sync::Arc;
|
||||
|
||||
use activitypub::ThoughtsObjectHandler;
|
||||
use activitypub_base::service::ActivityPubService;
|
||||
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
|
||||
use domain::{errors::DomainError, events::DomainEvent, ports::{EventPublisher, OutboxWriter}};
|
||||
use event_transport::EventPublisherAdapter;
|
||||
use nats::NatsTransport;
|
||||
use postgres::activitypub::PgActivityPubRepository;
|
||||
use postgres::outbox::PgOutboxWriter;
|
||||
use postgres::remote_actor_connections::PgRemoteActorConnectionRepository;
|
||||
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
|
||||
use presentation::state::AppState;
|
||||
@@ -120,6 +121,7 @@ pub async fn build(cfg: &Config) -> Infrastructure {
|
||||
}),
|
||||
hasher: Arc::new(auth::Argon2PasswordHasher),
|
||||
events: event_publisher,
|
||||
outbox: Arc::new(PgOutboxWriter::new(pool.clone())) as Arc<dyn OutboxWriter>,
|
||||
federation: ap_service.clone() as Arc<dyn domain::ports::FederationActionPort>,
|
||||
ap_repo: Arc::new(PgActivityPubRepository::new(pool.clone())),
|
||||
remote_actor_connections: Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())),
|
||||
|
||||
@@ -44,6 +44,11 @@ pub trait EventConsumer: Send + Sync {
|
||||
fn consume(&self) -> futures::stream::BoxStream<'_, Result<EventEnvelope, DomainError>>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait OutboxWriter: Send + Sync {
|
||||
async fn append(&self, event: &DomainEvent) -> Result<(), DomainError>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait UserReader: Send + Sync {
|
||||
async fn find_by_id(&self, id: &UserId) -> Result<Option<User>, DomainError>;
|
||||
|
||||
@@ -937,6 +937,33 @@ impl EventPublisher for NoOpEventPublisher {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub struct TestOutbox {
|
||||
pub entries: Arc<Mutex<Vec<DomainEvent>>>,
|
||||
}
|
||||
|
||||
impl TestOutbox {
|
||||
pub fn staged(&self) -> Vec<DomainEvent> {
|
||||
self.entries.lock().unwrap().clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl OutboxWriter for TestOutbox {
|
||||
async fn append(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
self.entries.lock().unwrap().push(event.clone());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NoOpOutboxWriter;
|
||||
#[async_trait]
|
||||
impl OutboxWriter for NoOpOutboxWriter {
|
||||
async fn append(&self, _e: &DomainEvent) -> Result<(), DomainError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod ap_repo_tests {
|
||||
use super::*;
|
||||
|
||||
@@ -66,6 +66,7 @@ pub async fn post_thought(
|
||||
&*s.users,
|
||||
&*s.tags,
|
||||
&*s.events,
|
||||
&*s.outbox,
|
||||
CreateThoughtInput {
|
||||
user_id: uid.clone(),
|
||||
content: body.content,
|
||||
@@ -124,7 +125,7 @@ pub async fn delete_thought_handler(
|
||||
AuthUser(uid): AuthUser,
|
||||
Path(id): Path<Uuid>,
|
||||
) -> Result<StatusCode, ApiError> {
|
||||
delete_thought(&*s.thoughts, &*s.events, &ThoughtId::from_uuid(id), &uid).await?;
|
||||
delete_thought(&*s.thoughts, &*s.events, &*s.outbox, &ThoughtId::from_uuid(id), &uid).await?;
|
||||
Ok(StatusCode::NO_CONTENT)
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ pub struct AppState {
|
||||
pub auth: Arc<dyn AuthService>,
|
||||
pub hasher: Arc<dyn PasswordHasher>,
|
||||
pub events: Arc<dyn EventPublisher>,
|
||||
pub outbox: Arc<dyn OutboxWriter>,
|
||||
pub federation: Arc<dyn FederationActionPort>,
|
||||
pub ap_repo: Arc<dyn ActivityPubRepository>,
|
||||
pub remote_actor_connections: Arc<dyn RemoteActorConnectionRepository>,
|
||||
|
||||
@@ -3,7 +3,7 @@ use async_trait::async_trait;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
ports::{AuthService, GeneratedToken, PasswordHasher},
|
||||
testing::TestStore,
|
||||
testing::{NoOpOutboxWriter, TestStore},
|
||||
value_objects::{PasswordHash, UserId},
|
||||
};
|
||||
use std::sync::Arc;
|
||||
@@ -48,6 +48,7 @@ pub fn make_state() -> AppState {
|
||||
auth: Arc::new(NoOpAuth),
|
||||
hasher: Arc::new(NoOpHasher),
|
||||
events: store.clone(),
|
||||
outbox: Arc::new(NoOpOutboxWriter),
|
||||
federation: store.clone(),
|
||||
ap_repo: store.clone(),
|
||||
remote_actor_connections: store.clone(),
|
||||
|
||||
@@ -17,6 +17,7 @@ pub struct WorkerHandlers {
|
||||
}
|
||||
|
||||
pub struct WorkerInfra {
|
||||
pub pool: PgPool,
|
||||
pub consumer: event_transport::EventConsumerAdapter<nats::NatsMessageSource>,
|
||||
pub handlers: WorkerHandlers,
|
||||
pub dlq_store: Arc<PgFailedEventStore>,
|
||||
@@ -85,7 +86,7 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
|
||||
};
|
||||
|
||||
// DLQ store
|
||||
let dlq_store = Arc::new(PgFailedEventStore::new(pool));
|
||||
let dlq_store = Arc::new(PgFailedEventStore::new(pool.clone()));
|
||||
|
||||
// NATS consumer + publisher
|
||||
let nats_client = async_nats::connect(nats_url)
|
||||
@@ -102,6 +103,7 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
|
||||
);
|
||||
|
||||
WorkerInfra {
|
||||
pool,
|
||||
consumer,
|
||||
handlers,
|
||||
dlq_store,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
mod dlq;
|
||||
mod factory;
|
||||
mod handlers;
|
||||
mod outbox_relay;
|
||||
|
||||
use domain::ports::EventConsumer;
|
||||
use futures::StreamExt;
|
||||
@@ -26,6 +27,16 @@ async fn main() {
|
||||
infra.event_publisher.clone(),
|
||||
));
|
||||
|
||||
// Spawn outbox relay — polls DB for undelivered events and publishes them.
|
||||
tokio::spawn(
|
||||
outbox_relay::OutboxRelay {
|
||||
pool: infra.pool.clone(),
|
||||
publisher: infra.event_publisher.clone(),
|
||||
poll_interval: std::time::Duration::from_secs(5),
|
||||
}
|
||||
.run(),
|
||||
);
|
||||
|
||||
tracing::info!("Worker started, consuming events...");
|
||||
let mut stream = infra.consumer.consume();
|
||||
while let Some(result) = stream.next().await {
|
||||
|
||||
88
crates/worker/src/outbox_relay.rs
Normal file
88
crates/worker/src/outbox_relay.rs
Normal file
@@ -0,0 +1,88 @@
|
||||
use domain::{events::DomainEvent, ports::EventPublisher};
|
||||
use event_payload::EventPayload;
|
||||
use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
pub struct OutboxRelay {
|
||||
pub pool: PgPool,
|
||||
pub publisher: Arc<dyn EventPublisher>,
|
||||
pub poll_interval: Duration,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct OutboxRow {
|
||||
seq: i64,
|
||||
event_type: String,
|
||||
payload: serde_json::Value,
|
||||
}
|
||||
|
||||
impl OutboxRelay {
|
||||
pub async fn run(self) {
|
||||
loop {
|
||||
if let Err(e) = self.process_batch().await {
|
||||
tracing::error!("outbox relay error: {e}");
|
||||
}
|
||||
tokio::time::sleep(self.poll_interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_batch(&self) -> Result<(), sqlx::Error> {
|
||||
let rows = sqlx::query_as::<_, OutboxRow>(
|
||||
"SELECT seq, event_type, payload \
|
||||
FROM outbox_events \
|
||||
WHERE delivered = false \
|
||||
ORDER BY seq ASC \
|
||||
LIMIT 100 \
|
||||
FOR UPDATE SKIP LOCKED",
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
for row in rows {
|
||||
let payload: EventPayload = match serde_json::from_value(row.payload.clone()) {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
tracing::error!(seq = row.seq, event_type = row.event_type, "outbox: failed to deserialize payload: {e}");
|
||||
// Mark delivered to avoid blocking; investigate manually.
|
||||
self.mark_delivered(row.seq).await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let domain_event = match DomainEvent::try_from(payload) {
|
||||
Ok(ev) => ev,
|
||||
Err(e) => {
|
||||
tracing::error!(seq = row.seq, "outbox: failed to convert to DomainEvent: {e}");
|
||||
self.mark_delivered(row.seq).await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match self.publisher.publish(&domain_event).await {
|
||||
Ok(()) => {
|
||||
self.mark_delivered(row.seq).await?;
|
||||
tracing::debug!(seq = row.seq, event_type = row.event_type, "outbox: delivered");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(seq = row.seq, "outbox: publish failed (will retry): {e}");
|
||||
// Leave delivered=false — will be retried next poll.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn mark_delivered(&self, seq: i64) -> Result<(), sqlx::Error> {
|
||||
sqlx::query(
|
||||
"UPDATE outbox_events \
|
||||
SET delivered = true, delivered_at = now() \
|
||||
WHERE seq = $1",
|
||||
)
|
||||
.bind(seq)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user