feat(event-publisher): add event publisher adapter with configuration and integration

This commit is contained in:
2026-05-04 12:30:42 +02:00
parent 563f33212e
commit 17f90726e8
6 changed files with 108 additions and 13 deletions

11
Cargo.lock generated
View File

@@ -681,6 +681,16 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "event-publisher"
version = "0.1.0"
dependencies = [
"async-trait",
"domain",
"tokio",
"tracing",
]
[[package]]
name = "find-msvc-tools"
version = "0.1.9"
@@ -1779,6 +1789,7 @@ dependencies = [
"chrono",
"domain",
"dotenvy",
"event-publisher",
"http-body-util",
"metadata",
"poster-fetcher",

View File

@@ -1,6 +1,6 @@
[workspace]
members = [
"crates/adapters/auth",
"crates/adapters/auth", "crates/adapters/event-publisher",
"crates/adapters/metadata", "crates/adapters/poster-fetcher", "crates/adapters/poster-storage",
"crates/adapters/rss",
"crates/adapters/sqlite",
@@ -41,6 +41,7 @@ auth = { path = "crates/adapters/auth" }
metadata = { path = "crates/adapters/metadata" }
poster-fetcher = { path = "crates/adapters/poster-fetcher" }
poster-storage = { path = "crates/adapters/poster-storage" }
event-publisher = { path = "crates/adapters/event-publisher" }
rss = { path = "crates/adapters/rss" }
sqlite = { path = "crates/adapters/sqlite" }
template-askama = { path = "crates/adapters/template-askama" }

View File

@@ -0,0 +1,10 @@
[package]
name = "event-publisher"
version = "0.1.0"
edition = "2024"
[dependencies]
domain = { workspace = true }
async-trait = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

View File

@@ -0,0 +1,79 @@
use async_trait::async_trait;
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
use tokio::sync::mpsc;
pub struct EventPublisherConfig {
pub channel_buffer: usize,
}
impl EventPublisherConfig {
pub fn from_env() -> Self {
let channel_buffer = std::env::var("EVENT_CHANNEL_BUFFER")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(128);
Self { channel_buffer }
}
}
pub struct ChannelEventPublisher {
sender: mpsc::Sender<DomainEvent>,
}
#[async_trait]
impl EventPublisher for ChannelEventPublisher {
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
self.sender
.send(event.clone())
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
}
}
pub struct EventWorker {
receiver: mpsc::Receiver<DomainEvent>,
}
impl EventWorker {
pub async fn run(mut self) {
while let Some(event) = self.receiver.recv().await {
match &event {
DomainEvent::ReviewLogged {
review_id,
movie_id,
user_id,
rating,
watched_at,
} => {
tracing::info!(
review_id = %review_id.value(),
movie_id = %movie_id.value(),
user_id = %user_id.value(),
rating = rating.value(),
watched_at = %watched_at,
"event: review_logged"
);
}
DomainEvent::MovieDiscovered {
movie_id,
external_metadata_id,
} => {
tracing::info!(
movie_id = %movie_id.value(),
external_id = external_metadata_id.value(),
"event: movie_discovered"
);
}
}
}
tracing::info!("event worker shut down");
}
}
pub fn create_event_channel(config: EventPublisherConfig) -> (ChannelEventPublisher, EventWorker) {
let (tx, rx) = mpsc::channel(config.channel_buffer);
(
ChannelEventPublisher { sender: tx },
EventWorker { receiver: rx },
)
}

View File

@@ -28,6 +28,7 @@ poster-storage = { workspace = true }
sqlite = { workspace = true }
sqlx = { workspace = true }
template-askama = { workspace = true }
event-publisher = { workspace = true }
rss = { workspace = true }
[dev-dependencies]

View File

@@ -1,8 +1,7 @@
use std::sync::Arc;
use anyhow::Context;
use async_trait::async_trait;
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
use event_publisher::{EventPublisherConfig, create_event_channel};
use sqlx::SqlitePool;
use tokio::net::TcpListener;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@@ -18,15 +17,6 @@ use template_askama::AskamaHtmlRenderer;
use presentation::{routes, state::AppState};
struct StubEventPublisher;
#[async_trait]
impl EventPublisher for StubEventPublisher {
async fn publish(&self, _event: &DomainEvent) -> Result<(), DomainError> {
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
@@ -64,12 +54,15 @@ async fn wire_dependencies() -> anyhow::Result<AppState> {
let user_repo = SqliteUserRepository::new(pool);
let (event_publisher, event_worker) = create_event_channel(EventPublisherConfig::from_env());
tokio::spawn(event_worker.run());
let app_ctx = AppContext {
repository: Arc::new(movie_repo),
metadata_client: Arc::new(MetadataClientImpl::new_omdb(omdb_api_key)),
poster_fetcher: Arc::new(ReqwestPosterFetcher::new(PosterFetcherConfig::from_env())?),
poster_storage: Arc::new(PosterStorageAdapter::from_config(storage_config)?),
event_publisher: Arc::new(StubEventPublisher),
event_publisher: Arc::new(event_publisher),
auth_service: Arc::new(JwtAuthService::new(auth_config)),
password_hasher: Arc::new(Argon2PasswordHasher),
user_repository: Arc::new(user_repo),