feat: event store — persist domain events to Postgres event_log table via composite publisher

This commit is contained in:
2026-05-31 18:36:10 +02:00
parent d022cb9068
commit aa09aec66b
10 changed files with 143 additions and 10 deletions

2
Cargo.lock generated
View File

@@ -38,6 +38,7 @@ dependencies = [
"async-trait", "async-trait",
"chrono", "chrono",
"domain", "domain",
"event-payload",
"serde", "serde",
"serde_json", "serde_json",
"sqlx", "sqlx",
@@ -3572,6 +3573,7 @@ dependencies = [
"domain", "domain",
"dotenvy", "dotenvy",
"event-transport", "event-transport",
"futures",
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",

View File

@@ -0,0 +1,26 @@
use async_trait::async_trait;
use domain::{
errors::DomainError,
events::DomainEvent,
ports::{EventPublisher, EventStore},
};
use std::sync::Arc;
pub struct CompositeEventPublisher {
primary: Arc<dyn EventPublisher>,
store: Arc<dyn EventStore>,
}
impl CompositeEventPublisher {
pub fn new(primary: Arc<dyn EventPublisher>, store: Arc<dyn EventStore>) -> Self {
Self { primary, store }
}
}
#[async_trait]
impl EventPublisher for CompositeEventPublisher {
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
self.store.append(event).await?;
self.primary.publish(event).await
}
}

View File

@@ -1,3 +1,6 @@
pub mod composite;
pub use composite::CompositeEventPublisher;
use async_trait::async_trait; use async_trait::async_trait;
use domain::{ use domain::{
errors::DomainError, errors::DomainError,

View File

@@ -4,11 +4,12 @@ version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
domain = { workspace = true } domain = { workspace = true }
sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "migrate", "uuid", "chrono", "json"] } event-payload = { workspace = true }
uuid = { workspace = true } sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "migrate", "uuid", "chrono", "json"] }
chrono = { workspace = true } uuid = { workspace = true }
anyhow = { workspace = true } chrono = { workspace = true }
async-trait = { workspace = true } anyhow = { workspace = true }
serde = { workspace = true } async-trait = { workspace = true }
serde_json = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true }

View File

@@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS event_log (
event_id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_event_log_aggregate ON event_log (aggregate_id);
CREATE INDEX idx_event_log_type ON event_log (event_type);
CREATE INDEX idx_event_log_occurred ON event_log (occurred_at);

View File

@@ -0,0 +1,74 @@
use crate::helpers::{pg_repo, MapDomainError};
use async_trait::async_trait;
use domain::{
errors::DomainError,
events::DomainEvent,
ports::EventStore,
value_objects::SystemId,
};
use event_payload::EventPayload;
use uuid::Uuid;
pg_repo!(PostgresEventStore);
/// Extracts the primary aggregate ID from a domain event.
fn aggregate_id(event: &DomainEvent) -> Uuid {
match event {
DomainEvent::AssetIngested { asset_id, .. }
| DomainEvent::MetadataUpdated { asset_id, .. }
| DomainEvent::AssetDeleted { asset_id, .. }
| DomainEvent::SidecarSyncRequested { asset_id, .. } => *asset_id.as_uuid(),
DomainEvent::ShareCreated { scope_id, .. }
| DomainEvent::ShareRevoked { scope_id, .. } => *scope_id.as_uuid(),
DomainEvent::JobEnqueued { job_id, .. }
| DomainEvent::JobCompleted { job_id, .. }
| DomainEvent::JobFailed { job_id, .. } => *job_id.as_uuid(),
}
}
#[async_trait]
impl EventStore for PostgresEventStore {
async fn append(&self, event: &DomainEvent) -> Result<(), DomainError> {
let payload = EventPayload::from(event);
let event_type = payload.subject().to_string();
let json = serde_json::to_value(&payload)
.map_err(|e| DomainError::Internal(e.to_string()))?;
let agg_id = aggregate_id(event);
sqlx::query(
"INSERT INTO event_log (aggregate_id, event_type, payload, occurred_at)
VALUES ($1, $2, $3, now())",
)
.bind(agg_id)
.bind(event_type)
.bind(json)
.execute(&self.pool)
.await
.map_pg()?;
Ok(())
}
async fn query_by_aggregate(
&self,
aggregate_id: &SystemId,
) -> Result<Vec<DomainEvent>, DomainError> {
let rows: Vec<(serde_json::Value,)> = sqlx::query_as(
"SELECT payload FROM event_log WHERE aggregate_id = $1 ORDER BY event_id ASC",
)
.bind(*aggregate_id.as_uuid())
.fetch_all(&self.pool)
.await
.map_pg()?;
rows.into_iter()
.map(|(json,)| {
let payload: EventPayload = serde_json::from_value(json)
.map_err(|e| DomainError::Internal(e.to_string()))?;
DomainEvent::try_from(payload)
})
.collect()
}
}

View File

@@ -2,6 +2,7 @@ pub mod db;
mod helpers; mod helpers;
pub mod catalog; pub mod catalog;
pub mod event_store;
pub mod identity; pub mod identity;
pub mod organization; pub mod organization;
pub mod processing; pub mod processing;
@@ -12,6 +13,7 @@ pub mod storage;
pub use db::{PgPool, connect, run_migrations}; pub use db::{PgPool, connect, run_migrations};
pub use catalog::*; pub use catalog::*;
pub use event_store::PostgresEventStore;
pub use identity::*; pub use identity::*;
pub use organization::*; pub use organization::*;
pub use processing::*; pub use processing::*;

View File

@@ -22,8 +22,12 @@ pub async fn build_app(config: &Config) -> Result<Router> {
adapters_nats::ensure_stream(&nats_client).await?; adapters_nats::ensure_stream(&nats_client).await?;
let transport = adapters_nats::NatsTransport::new(nats_client); let transport = adapters_nats::NatsTransport::new(nats_client);
let event_publisher: Arc<dyn domain::ports::EventPublisher> = let nats_publisher: Arc<dyn domain::ports::EventPublisher> =
Arc::new(event_transport::EventPublisherAdapter::new(transport)); Arc::new(event_transport::EventPublisherAdapter::new(transport));
let event_store: Arc<dyn domain::ports::EventStore> =
Arc::new(adapters_postgres::PostgresEventStore::new(pool.clone()));
let event_publisher: Arc<dyn domain::ports::EventPublisher> =
Arc::new(event_transport::CompositeEventPublisher::new(nats_publisher, event_store));
let storage_path = std::env::var("STORAGE_PATH").unwrap_or_else(|_| "./data/media".to_string()); let storage_path = std::env::var("STORAGE_PATH").unwrap_or_else(|_| "./data/media".to_string());
let file_storage: Arc<LocalFileStorage> = Arc::new(LocalFileStorage::new(&storage_path)); let file_storage: Arc<LocalFileStorage> = Arc::new(LocalFileStorage::new(&storage_path));

View File

@@ -1,5 +1,6 @@
use crate::common::errors::DomainError; use crate::common::errors::DomainError;
use crate::common::events::{DomainEvent, EventEnvelope}; use crate::common::events::{DomainEvent, EventEnvelope};
use crate::common::value_objects::SystemId;
use async_trait::async_trait; use async_trait::async_trait;
use futures::stream::BoxStream; use futures::stream::BoxStream;
@@ -11,3 +12,9 @@ pub trait EventPublisher: Send + Sync {
pub trait EventConsumer: Send + Sync { pub trait EventConsumer: Send + Sync {
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>>; fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>>;
} }
#[async_trait]
pub trait EventStore: Send + Sync {
async fn append(&self, event: &DomainEvent) -> Result<(), DomainError>;
async fn query_by_aggregate(&self, aggregate_id: &SystemId) -> Result<Vec<DomainEvent>, DomainError>;
}

View File

@@ -33,6 +33,8 @@ async fn main() -> anyhow::Result<()> {
adapters_nats::ensure_stream(&nats_client).await?; adapters_nats::ensure_stream(&nats_client).await?;
info!(nats_url = %config.nats_url, "NATS connected"); info!(nats_url = %config.nats_url, "NATS connected");
let event_store: Arc<dyn domain::ports::EventStore> =
Arc::new(adapters_postgres::PostgresEventStore::new(pool.clone()));
let repos = Repos::new(pool); let repos = Repos::new(pool);
let file_storage = Arc::new(adapters_storage::LocalFileStorage::new( let file_storage = Arc::new(adapters_storage::LocalFileStorage::new(
&config.storage_path, &config.storage_path,
@@ -41,8 +43,10 @@ async fn main() -> anyhow::Result<()> {
// Publisher transport consumes a client clone; the consumer gets another. // Publisher transport consumes a client clone; the consumer gets another.
let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone()); let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone());
let event_pub: Arc<dyn domain::ports::EventPublisher> = let nats_publisher: Arc<dyn domain::ports::EventPublisher> =
Arc::new(event_transport::EventPublisherAdapter::new(pub_transport)); Arc::new(event_transport::EventPublisherAdapter::new(pub_transport));
let event_pub: Arc<dyn domain::ports::EventPublisher> =
Arc::new(event_transport::CompositeEventPublisher::new(nats_publisher, event_store));
let registry = Arc::new(build_plugin_registry(&repos, file_storage, sidecar_writer)); let registry = Arc::new(build_plugin_registry(&repos, file_storage, sidecar_writer));
let process_next = Arc::new(build_process_next_handler(&repos, registry, event_pub)); let process_next = Arc::new(build_process_next_handler(&repos, registry, event_pub));