feat: centralize NATS message broker implementation in infra and abstract its usage in API and worker

This commit is contained in:
2026-01-02 01:22:18 +01:00
parent 064d2ae748
commit 328f2f2fb2
14 changed files with 158 additions and 110 deletions

View File

@@ -4,10 +4,11 @@ version = "0.1.0"
edition = "2024"
[features]
default = ["sqlite", "smart-features"]
default = ["sqlite", "smart-features", "broker-nats"]
sqlite = ["sqlx/sqlite", "tower-sessions-sqlx-store/sqlite"]
postgres = ["sqlx/postgres", "tower-sessions-sqlx-store/postgres"]
smart-features = ["dep:fastembed", "dep:qdrant-client"]
broker-nats = ["dep:async-nats", "dep:futures-util"]
[dependencies]
notes-domain = { path = "../notes-domain" }
@@ -23,3 +24,7 @@ tower-sessions-sqlx-store = { version = "0.15.0", default-features = false }
fastembed = { version = "5.4", optional = true }
qdrant-client = { version = "1.16", optional = true }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
async-nats = { version = "0.45", optional = true }
futures-util = { version = "0.3", optional = true }
futures-core = "0.3"

View File

@@ -0,0 +1,7 @@
//! Message broker adapters for various backends.
//!
//! This module provides implementations of the `MessageBroker` port
//! for different messaging backends.
#[cfg(feature = "broker-nats")]
pub mod nats;

View File

@@ -0,0 +1,66 @@
//! NATS message broker adapter
//!
//! Implements the `MessageBroker` port for NATS messaging.
use std::pin::Pin;
use async_trait::async_trait;
use futures_util::StreamExt;
use notes_domain::{DomainError, DomainResult, MessageBroker, Note};
/// NATS adapter implementing the MessageBroker port.
pub struct NatsMessageBroker {
client: async_nats::Client,
}
impl NatsMessageBroker {
/// Create a new NATS message broker by connecting to the given URL.
pub async fn connect(url: &str) -> Result<Self, async_nats::ConnectError> {
let client = async_nats::connect(url).await?;
Ok(Self { client })
}
/// Create a NATS message broker from an existing client.
pub fn from_client(client: async_nats::Client) -> Self {
Self { client }
}
}
#[async_trait]
impl MessageBroker for NatsMessageBroker {
async fn publish_note_updated(&self, note: &Note) -> DomainResult<()> {
let payload = serde_json::to_vec(note).map_err(|e| {
DomainError::RepositoryError(format!("Failed to serialize note: {}", e))
})?;
self.client
.publish("notes.updated", payload.into())
.await
.map_err(|e| DomainError::RepositoryError(format!("Failed to publish event: {}", e)))?;
Ok(())
}
async fn subscribe_note_updates(
&self,
) -> DomainResult<Pin<Box<dyn futures_core::Stream<Item = Note> + Send>>> {
let subscriber = self
.client
.subscribe("notes.updated")
.await
.map_err(|e| DomainError::RepositoryError(format!("Failed to subscribe: {}", e)))?;
// Transform the NATS message stream into a Note stream
let note_stream = subscriber.filter_map(|msg| async move {
match serde_json::from_slice::<Note>(&msg.payload) {
Ok(note) => Some(note),
Err(e) => {
tracing::warn!("Failed to deserialize note from message: {}", e);
None
}
}
});
Ok(Box::pin(note_stream))
}
}

View File

@@ -57,6 +57,37 @@ pub async fn build_vector_store(
}
}
/// Configuration for message broker providers.
#[derive(Debug, Clone)]
pub enum BrokerProvider {
/// NATS message broker (requires `broker-nats` feature).
#[cfg(feature = "broker-nats")]
Nats { url: String },
/// No message broker (messaging disabled).
None,
}
/// Build a message broker based on the provider configuration.
/// Returns `None` if `BrokerProvider::None` is specified.
pub async fn build_message_broker(
provider: &BrokerProvider,
) -> FactoryResult<Option<Arc<dyn notes_domain::MessageBroker>>> {
match provider {
#[cfg(feature = "broker-nats")]
BrokerProvider::Nats { url } => {
let broker = crate::broker::nats::NatsMessageBroker::connect(url)
.await
.map_err(|e| {
FactoryError::Infrastructure(notes_domain::DomainError::RepositoryError(
format!("NATS connection failed: {}", e),
))
})?;
Ok(Some(Arc::new(broker)))
}
BrokerProvider::None => Ok(None),
}
}
#[cfg(feature = "sqlite")]
pub async fn build_link_repository(
pool: &DatabasePool,

View File

@@ -14,6 +14,8 @@
//! - [`db::create_pool`] - Create a database connection pool
//! - [`db::run_migrations`] - Run database migrations
#[cfg(feature = "broker-nats")]
pub mod broker;
pub mod db;
#[cfg(feature = "smart-features")]
pub mod embeddings;