diff --git a/Cargo.lock b/Cargo.lock index f3039ae..8e01c73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -114,42 +114,6 @@ dependencies = [ "stable_deref_trait", ] -[[package]] -name = "async-nats" -version = "0.39.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a798aab0c0203b31d67d501e5ed1f3ac6c36a329899ce47fc93c3bea53f3ae89" -dependencies = [ - "base64 0.22.1", - "bytes", - "futures", - "memchr", - "nkeys", - "nuid", - "once_cell", - "pin-project", - "portable-atomic", - "rand 0.8.5", - "regex", - "ring", - "rustls-native-certs 0.7.3", - "rustls-pemfile", - "rustls-webpki 0.102.8", - "serde", - "serde_json", - "serde_nanos", - "serde_repr", - "thiserror 1.0.69", - "time", - "tokio", - "tokio-rustls", - "tokio-util", - "tokio-websockets", - "tracing", - "tryhard", - "url", -] - [[package]] name = "async-nats" version = "0.45.0" @@ -2171,7 +2135,6 @@ name = "notes-api" version = "0.1.0" dependencies = [ "anyhow", - "async-nats 0.39.0", "async-trait", "axum 0.8.8", "axum-login", @@ -2203,6 +2166,7 @@ dependencies = [ "anyhow", "async-trait", "chrono", + "futures-core", "serde", "serde_json", "thiserror 2.0.17", @@ -2215,11 +2179,15 @@ dependencies = [ name = "notes-infra" version = "0.1.0" dependencies = [ + "async-nats", "async-trait", "chrono", "fastembed", + "futures-core", + "futures-util", "notes-domain", "qdrant-client", + "serde", "serde_json", "sqlx", "thiserror 2.0.17", @@ -2235,7 +2203,6 @@ name = "notes-worker" version = "0.1.0" dependencies = [ "anyhow", - "async-nats 0.45.0", "async-trait", "bytes", "chrono", diff --git a/notes-api/Cargo.toml b/notes-api/Cargo.toml index 8a2be79..d5b9e64 100644 --- a/notes-api/Cargo.toml +++ b/notes-api/Cargo.toml @@ -16,7 +16,7 @@ postgres = [ "tower-sessions-sqlx-store/postgres", "sqlx/postgres", ] -smart-features = ["notes-infra/smart-features", "dep:async-nats"] +smart-features = ["notes-infra/smart-features", "notes-infra/broker-nats"] [dependencies] notes-domain = { path = "../notes-domain" } @@ -36,7 +36,6 @@ tower-sessions-sqlx-store = { version = "0.15", features = ["sqlite"] } password-auth = "1.0" time = "0.3" async-trait = "0.1.89" -async-nats = { version = "0.39", optional = true } # Async runtime tokio = { version = "1.48.0", features = ["full"] } diff --git a/notes-api/src/main.rs b/notes-api/src/main.rs index e5abb72..fb1a02f 100644 --- a/notes-api/src/main.rs +++ b/notes-api/src/main.rs @@ -18,8 +18,6 @@ mod auth; mod config; mod dto; mod error; -#[cfg(feature = "smart-features")] -mod nats_broker; mod routes; mod state; @@ -81,13 +79,17 @@ async fn main() -> anyhow::Result<()> { .await .map_err(|e| anyhow::anyhow!(e))?; - // Connect to NATS (before creating services that depend on it) + // Connect to message broker via factory #[cfg(feature = "smart-features")] - let nats_client = { - tracing::info!("Connecting to NATS: {}", config.broker_url); - async_nats::connect(&config.broker_url) + let message_broker = { + use notes_infra::factory::{BrokerProvider, build_message_broker}; + tracing::info!("Connecting to message broker: {}", config.broker_url); + let provider = BrokerProvider::Nats { + url: config.broker_url.clone(), + }; + build_message_broker(&provider) .await - .map_err(|e| anyhow::anyhow!("NATS connection failed: {}", e))? + .map_err(|e| anyhow::anyhow!("Broker connection failed: {}", e))? }; // Create services @@ -95,9 +97,11 @@ async fn main() -> anyhow::Result<()> { // Build NoteService with optional MessageBroker #[cfg(feature = "smart-features")] - let note_service = { - let broker = Arc::new(nats_broker::NatsMessageBroker::new(nats_client.clone())); - Arc::new(NoteService::new(note_repo.clone(), tag_repo.clone()).with_message_broker(broker)) + let note_service = match message_broker { + Some(broker) => Arc::new( + NoteService::new(note_repo.clone(), tag_repo.clone()).with_message_broker(broker), + ), + None => Arc::new(NoteService::new(note_repo.clone(), tag_repo.clone())), }; #[cfg(not(feature = "smart-features"))] let note_service = Arc::new(NoteService::new(note_repo.clone(), tag_repo.clone())); @@ -115,8 +119,6 @@ async fn main() -> anyhow::Result<()> { note_service, tag_service, user_service, - #[cfg(feature = "smart-features")] - nats_client, config.clone(), ); diff --git a/notes-api/src/nats_broker.rs b/notes-api/src/nats_broker.rs deleted file mode 100644 index 431bf34..0000000 --- a/notes-api/src/nats_broker.rs +++ /dev/null @@ -1,31 +0,0 @@ -//! NATS message broker adapter for domain MessageBroker port - -use async_trait::async_trait; -use notes_domain::{DomainError, DomainResult, MessageBroker, Note}; - -/// NATS adapter implementing the MessageBroker port -pub struct NatsMessageBroker { - client: async_nats::Client, -} - -impl NatsMessageBroker { - pub fn new(client: async_nats::Client) -> Self { - Self { client } - } -} - -#[async_trait] -impl MessageBroker for NatsMessageBroker { - async fn publish_note_updated(&self, note: &Note) -> DomainResult<()> { - let payload = serde_json::to_vec(note).map_err(|e| { - DomainError::RepositoryError(format!("Failed to serialize note: {}", e)) - })?; - - self.client - .publish("notes.updated", payload.into()) - .await - .map_err(|e| DomainError::RepositoryError(format!("Failed to publish event: {}", e)))?; - - Ok(()) - } -} diff --git a/notes-api/src/state.rs b/notes-api/src/state.rs index 9a38ebc..1f6d97e 100644 --- a/notes-api/src/state.rs +++ b/notes-api/src/state.rs @@ -16,8 +16,6 @@ pub struct AppState { pub note_service: Arc, pub tag_service: Arc, pub user_service: Arc, - #[cfg(feature = "smart-features")] - pub nats_client: async_nats::Client, pub config: Config, } @@ -30,7 +28,6 @@ impl AppState { note_service: Arc, tag_service: Arc, user_service: Arc, - #[cfg(feature = "smart-features")] nats_client: async_nats::Client, config: Config, ) -> Self { Self { @@ -42,8 +39,6 @@ impl AppState { note_service, tag_service, user_service, - #[cfg(feature = "smart-features")] - nats_client, config, } } diff --git a/notes-domain/Cargo.toml b/notes-domain/Cargo.toml index a51a4cc..5b2b2ef 100644 --- a/notes-domain/Cargo.toml +++ b/notes-domain/Cargo.toml @@ -12,6 +12,7 @@ serde_json = "1.0.146" thiserror = "2.0.17" tracing = "0.1" uuid = { version = "1.19.0", features = ["v4", "serde"] } +futures-core = "0.3" [dev-dependencies] tokio = { version = "1", features = ["rt", "macros"] } diff --git a/notes-domain/src/ports.rs b/notes-domain/src/ports.rs index 11a7c51..473c9ae 100644 --- a/notes-domain/src/ports.rs +++ b/notes-domain/src/ports.rs @@ -42,4 +42,10 @@ pub trait LinkRepository: Send + Sync { pub trait MessageBroker: Send + Sync { /// Publish an event when a note is created or updated. async fn publish_note_updated(&self, note: &Note) -> DomainResult<()>; + + /// Subscribe to note update events. + /// Returns a stream of notes that have been updated. + async fn subscribe_note_updates( + &self, + ) -> DomainResult + Send>>>; } diff --git a/notes-infra/Cargo.toml b/notes-infra/Cargo.toml index cb3830e..c52de8c 100644 --- a/notes-infra/Cargo.toml +++ b/notes-infra/Cargo.toml @@ -4,10 +4,11 @@ version = "0.1.0" edition = "2024" [features] -default = ["sqlite", "smart-features"] +default = ["sqlite", "smart-features", "broker-nats"] sqlite = ["sqlx/sqlite", "tower-sessions-sqlx-store/sqlite"] postgres = ["sqlx/postgres", "tower-sessions-sqlx-store/postgres"] smart-features = ["dep:fastembed", "dep:qdrant-client"] +broker-nats = ["dep:async-nats", "dep:futures-util"] [dependencies] notes-domain = { path = "../notes-domain" } @@ -23,3 +24,7 @@ tower-sessions-sqlx-store = { version = "0.15.0", default-features = false } fastembed = { version = "5.4", optional = true } qdrant-client = { version = "1.16", optional = true } serde_json = "1.0" +serde = { version = "1.0", features = ["derive"] } +async-nats = { version = "0.45", optional = true } +futures-util = { version = "0.3", optional = true } +futures-core = "0.3" diff --git a/notes-infra/src/broker/mod.rs b/notes-infra/src/broker/mod.rs new file mode 100644 index 0000000..6defcd9 --- /dev/null +++ b/notes-infra/src/broker/mod.rs @@ -0,0 +1,7 @@ +//! Message broker adapters for various backends. +//! +//! This module provides implementations of the `MessageBroker` port +//! for different messaging backends. + +#[cfg(feature = "broker-nats")] +pub mod nats; diff --git a/notes-infra/src/broker/nats.rs b/notes-infra/src/broker/nats.rs new file mode 100644 index 0000000..05fb768 --- /dev/null +++ b/notes-infra/src/broker/nats.rs @@ -0,0 +1,66 @@ +//! NATS message broker adapter +//! +//! Implements the `MessageBroker` port for NATS messaging. + +use std::pin::Pin; + +use async_trait::async_trait; +use futures_util::StreamExt; +use notes_domain::{DomainError, DomainResult, MessageBroker, Note}; + +/// NATS adapter implementing the MessageBroker port. +pub struct NatsMessageBroker { + client: async_nats::Client, +} + +impl NatsMessageBroker { + /// Create a new NATS message broker by connecting to the given URL. + pub async fn connect(url: &str) -> Result { + let client = async_nats::connect(url).await?; + Ok(Self { client }) + } + + /// Create a NATS message broker from an existing client. + pub fn from_client(client: async_nats::Client) -> Self { + Self { client } + } +} + +#[async_trait] +impl MessageBroker for NatsMessageBroker { + async fn publish_note_updated(&self, note: &Note) -> DomainResult<()> { + let payload = serde_json::to_vec(note).map_err(|e| { + DomainError::RepositoryError(format!("Failed to serialize note: {}", e)) + })?; + + self.client + .publish("notes.updated", payload.into()) + .await + .map_err(|e| DomainError::RepositoryError(format!("Failed to publish event: {}", e)))?; + + Ok(()) + } + + async fn subscribe_note_updates( + &self, + ) -> DomainResult + Send>>> { + let subscriber = self + .client + .subscribe("notes.updated") + .await + .map_err(|e| DomainError::RepositoryError(format!("Failed to subscribe: {}", e)))?; + + // Transform the NATS message stream into a Note stream + let note_stream = subscriber.filter_map(|msg| async move { + match serde_json::from_slice::(&msg.payload) { + Ok(note) => Some(note), + Err(e) => { + tracing::warn!("Failed to deserialize note from message: {}", e); + None + } + } + }); + + Ok(Box::pin(note_stream)) + } +} diff --git a/notes-infra/src/factory.rs b/notes-infra/src/factory.rs index 94b79a8..5371fdd 100644 --- a/notes-infra/src/factory.rs +++ b/notes-infra/src/factory.rs @@ -57,6 +57,37 @@ pub async fn build_vector_store( } } +/// Configuration for message broker providers. +#[derive(Debug, Clone)] +pub enum BrokerProvider { + /// NATS message broker (requires `broker-nats` feature). + #[cfg(feature = "broker-nats")] + Nats { url: String }, + /// No message broker (messaging disabled). + None, +} + +/// Build a message broker based on the provider configuration. +/// Returns `None` if `BrokerProvider::None` is specified. +pub async fn build_message_broker( + provider: &BrokerProvider, +) -> FactoryResult>> { + match provider { + #[cfg(feature = "broker-nats")] + BrokerProvider::Nats { url } => { + let broker = crate::broker::nats::NatsMessageBroker::connect(url) + .await + .map_err(|e| { + FactoryError::Infrastructure(notes_domain::DomainError::RepositoryError( + format!("NATS connection failed: {}", e), + )) + })?; + Ok(Some(Arc::new(broker))) + } + BrokerProvider::None => Ok(None), + } +} + #[cfg(feature = "sqlite")] pub async fn build_link_repository( pool: &DatabasePool, diff --git a/notes-infra/src/lib.rs b/notes-infra/src/lib.rs index d8ae992..f091cfb 100644 --- a/notes-infra/src/lib.rs +++ b/notes-infra/src/lib.rs @@ -14,6 +14,8 @@ //! - [`db::create_pool`] - Create a database connection pool //! - [`db::run_migrations`] - Run database migrations +#[cfg(feature = "broker-nats")] +pub mod broker; pub mod db; #[cfg(feature = "smart-features")] pub mod embeddings; diff --git a/notes-worker/Cargo.toml b/notes-worker/Cargo.toml index 0a94082..cd298d8 100644 --- a/notes-worker/Cargo.toml +++ b/notes-worker/Cargo.toml @@ -7,11 +7,10 @@ edition = "2024" default = ["sqlite", "smart-features"] sqlite = ["notes-infra/sqlite", "sqlx/sqlite"] # postgres = ["notes-infra/postgres", "sqlx/postgres"] -smart-features = ["notes-infra/smart-features"] +smart-features = ["notes-infra/smart-features", "notes-infra/broker-nats"] [dependencies] anyhow = "1.0.100" -async-nats = "0.45.0" notes-domain = { path = "../notes-domain" } notes-infra = { path = "../notes-infra", default-features = false } serde = { version = "1.0.228", features = ["derive"] } diff --git a/notes-worker/src/main.rs b/notes-worker/src/main.rs index 5a291a0..75b0c8f 100644 --- a/notes-worker/src/main.rs +++ b/notes-worker/src/main.rs @@ -5,7 +5,8 @@ use notes_domain::services::SmartNoteService; use notes_infra::{ DatabaseConfig, factory::{ - build_database_pool, build_embedding_generator, build_link_repository, build_vector_store, + BrokerProvider, build_database_pool, build_embedding_generator, build_link_repository, + build_message_broker, build_vector_store, }, }; @@ -26,10 +27,18 @@ async fn main() -> anyhow::Result<()> { .init(); let config = Config::from_env(); - let nats_client = async_nats::connect(&config.broker_url).await?; #[cfg(feature = "smart-features")] { + // Connect to message broker via factory + tracing::info!("Connecting to message broker: {}", config.broker_url); + let broker_provider = BrokerProvider::Nats { + url: config.broker_url.clone(), + }; + let broker = build_message_broker(&broker_provider) + .await? + .expect("Message broker required for worker"); + let db_config = DatabaseConfig::new(config.database_url.clone()); let db_pool = build_database_pool(&db_config).await?; @@ -45,25 +54,15 @@ async fn main() -> anyhow::Result<()> { config.embedding_provider ); - // Subscribe to note update events - let mut subscriber = nats_client.subscribe("notes.updated").await?; + // Subscribe to note update events via the broker's stream API + let mut note_stream = broker.subscribe_note_updates().await?; tracing::info!("Worker listening on 'notes.updated'..."); - while let Some(msg) = subscriber.next().await { - // Parse message payload (assuming the payload IS the Note JSON) - let note_result: Result = serde_json::from_slice(&msg.payload); - - match note_result { - Ok(note) => { - tracing::info!("Processing smart features for note: {}", note.id); - match smart_service.process_note(¬e).await { - Ok(_) => tracing::info!("Successfully processed note {}", note.id), - Err(e) => tracing::error!("Failed to process note {}: {}", note.id, e), - } - } - Err(e) => { - tracing::error!("Failed to deserialize note from message: {}", e); - } + while let Some(note) = note_stream.next().await { + tracing::info!("Processing smart features for note: {}", note.id); + match smart_service.process_note(¬e).await { + Ok(_) => tracing::info!("Successfully processed note {}", note.id), + Err(e) => tracing::error!("Failed to process note {}: {}", note.id, e), } } }