diff --git a/Cargo.lock b/Cargo.lock index 8457352..6d2724c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,6 +38,7 @@ dependencies = [ "async-trait", "chrono", "domain", + "event-payload", "serde", "serde_json", "sqlx", @@ -3572,6 +3573,7 @@ dependencies = [ "domain", "dotenvy", "event-transport", + "futures", "tokio", "tracing", "tracing-subscriber", diff --git a/crates/adapters/event-transport/src/composite.rs b/crates/adapters/event-transport/src/composite.rs new file mode 100644 index 0000000..b5fa484 --- /dev/null +++ b/crates/adapters/event-transport/src/composite.rs @@ -0,0 +1,26 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::DomainEvent, + ports::{EventPublisher, EventStore}, +}; +use std::sync::Arc; + +pub struct CompositeEventPublisher { + primary: Arc, + store: Arc, +} + +impl CompositeEventPublisher { + pub fn new(primary: Arc, store: Arc) -> Self { + Self { primary, store } + } +} + +#[async_trait] +impl EventPublisher for CompositeEventPublisher { + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { + self.store.append(event).await?; + self.primary.publish(event).await + } +} diff --git a/crates/adapters/event-transport/src/lib.rs b/crates/adapters/event-transport/src/lib.rs index fd6d87d..46a6521 100644 --- a/crates/adapters/event-transport/src/lib.rs +++ b/crates/adapters/event-transport/src/lib.rs @@ -1,3 +1,6 @@ +pub mod composite; +pub use composite::CompositeEventPublisher; + use async_trait::async_trait; use domain::{ errors::DomainError, diff --git a/crates/adapters/postgres/Cargo.toml b/crates/adapters/postgres/Cargo.toml index ee72413..022cc1f 100644 --- a/crates/adapters/postgres/Cargo.toml +++ b/crates/adapters/postgres/Cargo.toml @@ -4,11 +4,12 @@ version = "0.1.0" edition = "2024" [dependencies] -domain = { workspace = true } -sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "migrate", "uuid", "chrono", "json"] } -uuid = { workspace = true } -chrono = { workspace = true } -anyhow = { workspace = true } -async-trait = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } +domain = { workspace = true } +event-payload = { workspace = true } +sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "migrate", "uuid", "chrono", "json"] } +uuid = { workspace = true } +chrono = { workspace = true } +anyhow = { workspace = true } +async-trait = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } diff --git a/crates/adapters/postgres/migrations/010_event_log.sql b/crates/adapters/postgres/migrations/010_event_log.sql new file mode 100644 index 0000000..cb48407 --- /dev/null +++ b/crates/adapters/postgres/migrations/010_event_log.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS event_log ( + event_id BIGSERIAL PRIMARY KEY, + aggregate_id UUID NOT NULL, + event_type TEXT NOT NULL, + payload JSONB NOT NULL, + occurred_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +CREATE INDEX idx_event_log_aggregate ON event_log (aggregate_id); +CREATE INDEX idx_event_log_type ON event_log (event_type); +CREATE INDEX idx_event_log_occurred ON event_log (occurred_at); diff --git a/crates/adapters/postgres/src/event_store.rs b/crates/adapters/postgres/src/event_store.rs new file mode 100644 index 0000000..3f8cbc2 --- /dev/null +++ b/crates/adapters/postgres/src/event_store.rs @@ -0,0 +1,74 @@ +use crate::helpers::{pg_repo, MapDomainError}; +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::DomainEvent, + ports::EventStore, + value_objects::SystemId, +}; +use event_payload::EventPayload; +use uuid::Uuid; + +pg_repo!(PostgresEventStore); + +/// Extracts the primary aggregate ID from a domain event. +fn aggregate_id(event: &DomainEvent) -> Uuid { + match event { + DomainEvent::AssetIngested { asset_id, .. } + | DomainEvent::MetadataUpdated { asset_id, .. } + | DomainEvent::AssetDeleted { asset_id, .. } + | DomainEvent::SidecarSyncRequested { asset_id, .. } => *asset_id.as_uuid(), + + DomainEvent::ShareCreated { scope_id, .. } + | DomainEvent::ShareRevoked { scope_id, .. } => *scope_id.as_uuid(), + + DomainEvent::JobEnqueued { job_id, .. } + | DomainEvent::JobCompleted { job_id, .. } + | DomainEvent::JobFailed { job_id, .. } => *job_id.as_uuid(), + } +} + +#[async_trait] +impl EventStore for PostgresEventStore { + async fn append(&self, event: &DomainEvent) -> Result<(), DomainError> { + let payload = EventPayload::from(event); + let event_type = payload.subject().to_string(); + let json = serde_json::to_value(&payload) + .map_err(|e| DomainError::Internal(e.to_string()))?; + let agg_id = aggregate_id(event); + + sqlx::query( + "INSERT INTO event_log (aggregate_id, event_type, payload, occurred_at) + VALUES ($1, $2, $3, now())", + ) + .bind(agg_id) + .bind(event_type) + .bind(json) + .execute(&self.pool) + .await + .map_pg()?; + + Ok(()) + } + + async fn query_by_aggregate( + &self, + aggregate_id: &SystemId, + ) -> Result, DomainError> { + let rows: Vec<(serde_json::Value,)> = sqlx::query_as( + "SELECT payload FROM event_log WHERE aggregate_id = $1 ORDER BY event_id ASC", + ) + .bind(*aggregate_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_pg()?; + + rows.into_iter() + .map(|(json,)| { + let payload: EventPayload = serde_json::from_value(json) + .map_err(|e| DomainError::Internal(e.to_string()))?; + DomainEvent::try_from(payload) + }) + .collect() + } +} diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index ce1d970..c06fd50 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -2,6 +2,7 @@ pub mod db; mod helpers; pub mod catalog; +pub mod event_store; pub mod identity; pub mod organization; pub mod processing; @@ -12,6 +13,7 @@ pub mod storage; pub use db::{PgPool, connect, run_migrations}; pub use catalog::*; +pub use event_store::PostgresEventStore; pub use identity::*; pub use organization::*; pub use processing::*; diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index be823bd..a9f37b3 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -22,8 +22,12 @@ pub async fn build_app(config: &Config) -> Result { adapters_nats::ensure_stream(&nats_client).await?; let transport = adapters_nats::NatsTransport::new(nats_client); - let event_publisher: Arc = + let nats_publisher: Arc = Arc::new(event_transport::EventPublisherAdapter::new(transport)); + let event_store: Arc = + Arc::new(adapters_postgres::PostgresEventStore::new(pool.clone())); + let event_publisher: Arc = + Arc::new(event_transport::CompositeEventPublisher::new(nats_publisher, event_store)); let storage_path = std::env::var("STORAGE_PATH").unwrap_or_else(|_| "./data/media".to_string()); let file_storage: Arc = Arc::new(LocalFileStorage::new(&storage_path)); diff --git a/crates/domain/src/common/ports.rs b/crates/domain/src/common/ports.rs index b2d8729..9a2fb08 100644 --- a/crates/domain/src/common/ports.rs +++ b/crates/domain/src/common/ports.rs @@ -1,5 +1,6 @@ use crate::common::errors::DomainError; use crate::common::events::{DomainEvent, EventEnvelope}; +use crate::common::value_objects::SystemId; use async_trait::async_trait; use futures::stream::BoxStream; @@ -11,3 +12,9 @@ pub trait EventPublisher: Send + Sync { pub trait EventConsumer: Send + Sync { fn consume(&self) -> BoxStream<'_, Result>; } + +#[async_trait] +pub trait EventStore: Send + Sync { + async fn append(&self, event: &DomainEvent) -> Result<(), DomainError>; + async fn query_by_aggregate(&self, aggregate_id: &SystemId) -> Result, DomainError>; +} diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 106bc1d..d6ebac6 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -33,6 +33,8 @@ async fn main() -> anyhow::Result<()> { adapters_nats::ensure_stream(&nats_client).await?; info!(nats_url = %config.nats_url, "NATS connected"); + let event_store: Arc = + Arc::new(adapters_postgres::PostgresEventStore::new(pool.clone())); let repos = Repos::new(pool); let file_storage = Arc::new(adapters_storage::LocalFileStorage::new( &config.storage_path, @@ -41,8 +43,10 @@ async fn main() -> anyhow::Result<()> { // Publisher transport consumes a client clone; the consumer gets another. let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone()); - let event_pub: Arc = + let nats_publisher: Arc = Arc::new(event_transport::EventPublisherAdapter::new(pub_transport)); + let event_pub: Arc = + Arc::new(event_transport::CompositeEventPublisher::new(nats_publisher, event_store)); let registry = Arc::new(build_plugin_registry(&repos, file_storage, sidecar_writer)); let process_next = Arc::new(build_process_next_handler(&repos, registry, event_pub));