From 17f90726e81128a050a92d0035404ab78073f5bf Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Mon, 4 May 2026 12:30:42 +0200 Subject: [PATCH] feat(event-publisher): add event publisher adapter with configuration and integration --- Cargo.lock | 11 +++ Cargo.toml | 3 +- crates/adapters/event-publisher/Cargo.toml | 10 +++ crates/adapters/event-publisher/src/lib.rs | 79 ++++++++++++++++++++++ crates/presentation/Cargo.toml | 1 + crates/presentation/src/main.rs | 17 ++--- 6 files changed, 108 insertions(+), 13 deletions(-) create mode 100644 crates/adapters/event-publisher/Cargo.toml create mode 100644 crates/adapters/event-publisher/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 8c44f36..bb71a0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -681,6 +681,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "event-publisher" +version = "0.1.0" +dependencies = [ + "async-trait", + "domain", + "tokio", + "tracing", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -1779,6 +1789,7 @@ dependencies = [ "chrono", "domain", "dotenvy", + "event-publisher", "http-body-util", "metadata", "poster-fetcher", diff --git a/Cargo.toml b/Cargo.toml index 52b4a24..6ccac46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] members = [ - "crates/adapters/auth", + "crates/adapters/auth", "crates/adapters/event-publisher", "crates/adapters/metadata", "crates/adapters/poster-fetcher", "crates/adapters/poster-storage", "crates/adapters/rss", "crates/adapters/sqlite", @@ -41,6 +41,7 @@ auth = { path = "crates/adapters/auth" } metadata = { path = "crates/adapters/metadata" } poster-fetcher = { path = "crates/adapters/poster-fetcher" } poster-storage = { path = "crates/adapters/poster-storage" } +event-publisher = { path = "crates/adapters/event-publisher" } rss = { path = "crates/adapters/rss" } sqlite = { path = "crates/adapters/sqlite" } template-askama = { path = "crates/adapters/template-askama" } diff --git a/crates/adapters/event-publisher/Cargo.toml b/crates/adapters/event-publisher/Cargo.toml new file mode 100644 index 0000000..d7044a8 --- /dev/null +++ b/crates/adapters/event-publisher/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "event-publisher" +version = "0.1.0" +edition = "2024" + +[dependencies] +domain = { workspace = true } +async-trait = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } diff --git a/crates/adapters/event-publisher/src/lib.rs b/crates/adapters/event-publisher/src/lib.rs new file mode 100644 index 0000000..c60baa0 --- /dev/null +++ b/crates/adapters/event-publisher/src/lib.rs @@ -0,0 +1,79 @@ +use async_trait::async_trait; +use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; +use tokio::sync::mpsc; + +pub struct EventPublisherConfig { + pub channel_buffer: usize, +} + +impl EventPublisherConfig { + pub fn from_env() -> Self { + let channel_buffer = std::env::var("EVENT_CHANNEL_BUFFER") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(128); + Self { channel_buffer } + } +} + +pub struct ChannelEventPublisher { + sender: mpsc::Sender, +} + +#[async_trait] +impl EventPublisher for ChannelEventPublisher { + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { + self.sender + .send(event.clone()) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } +} + +pub struct EventWorker { + receiver: mpsc::Receiver, +} + +impl EventWorker { + pub async fn run(mut self) { + while let Some(event) = self.receiver.recv().await { + match &event { + DomainEvent::ReviewLogged { + review_id, + movie_id, + user_id, + rating, + watched_at, + } => { + tracing::info!( + review_id = %review_id.value(), + movie_id = %movie_id.value(), + user_id = %user_id.value(), + rating = rating.value(), + watched_at = %watched_at, + "event: review_logged" + ); + } + DomainEvent::MovieDiscovered { + movie_id, + external_metadata_id, + } => { + tracing::info!( + movie_id = %movie_id.value(), + external_id = external_metadata_id.value(), + "event: movie_discovered" + ); + } + } + } + tracing::info!("event worker shut down"); + } +} + +pub fn create_event_channel(config: EventPublisherConfig) -> (ChannelEventPublisher, EventWorker) { + let (tx, rx) = mpsc::channel(config.channel_buffer); + ( + ChannelEventPublisher { sender: tx }, + EventWorker { receiver: rx }, + ) +} diff --git a/crates/presentation/Cargo.toml b/crates/presentation/Cargo.toml index 4571aa4..06ebbf4 100644 --- a/crates/presentation/Cargo.toml +++ b/crates/presentation/Cargo.toml @@ -28,6 +28,7 @@ poster-storage = { workspace = true } sqlite = { workspace = true } sqlx = { workspace = true } template-askama = { workspace = true } +event-publisher = { workspace = true } rss = { workspace = true } [dev-dependencies] diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index 4f17d1c..e1cbed7 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -1,8 +1,7 @@ use std::sync::Arc; use anyhow::Context; -use async_trait::async_trait; -use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; +use event_publisher::{EventPublisherConfig, create_event_channel}; use sqlx::SqlitePool; use tokio::net::TcpListener; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -18,15 +17,6 @@ use template_askama::AskamaHtmlRenderer; use presentation::{routes, state::AppState}; -struct StubEventPublisher; - -#[async_trait] -impl EventPublisher for StubEventPublisher { - async fn publish(&self, _event: &DomainEvent) -> Result<(), DomainError> { - Ok(()) - } -} - #[tokio::main] async fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); @@ -64,12 +54,15 @@ async fn wire_dependencies() -> anyhow::Result { let user_repo = SqliteUserRepository::new(pool); + let (event_publisher, event_worker) = create_event_channel(EventPublisherConfig::from_env()); + tokio::spawn(event_worker.run()); + let app_ctx = AppContext { repository: Arc::new(movie_repo), metadata_client: Arc::new(MetadataClientImpl::new_omdb(omdb_api_key)), poster_fetcher: Arc::new(ReqwestPosterFetcher::new(PosterFetcherConfig::from_env())?), poster_storage: Arc::new(PosterStorageAdapter::from_config(storage_config)?), - event_publisher: Arc::new(StubEventPublisher), + event_publisher: Arc::new(event_publisher), auth_service: Arc::new(JwtAuthService::new(auth_config)), password_hasher: Arc::new(Argon2PasswordHasher), user_repository: Arc::new(user_repo),