From 64f81182288134eb05add967b6374bb9ace2d25c Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 26 Dec 2025 00:19:41 +0100 Subject: [PATCH] feat: Implement NATS-based event processing for note smart features and add Qdrant collection auto-creation. --- .gitignore | 4 +++- Cargo.lock | 39 +++++++++++++++++++++++++++++++- compose.yml | 12 +++++++++- notes-api/Cargo.toml | 1 + notes-api/src/config.rs | 6 +++++ notes-api/src/main.rs | 7 ++++++ notes-api/src/routes/notes.rs | 24 ++++++++++++++++++++ notes-api/src/state.rs | 3 +++ notes-infra/src/factory.rs | 1 + notes-infra/src/vector/qdrant.rs | 30 +++++++++++++++++++++++- notes-worker/src/main.rs | 34 ++++++++++++++++++++++++++-- 11 files changed, 155 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index c2610d1..38d0181 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,6 @@ *.db-shm *.db-wal .DS_Store -*/.DS_Store \ No newline at end of file +*/.DS_Store +/data +/.fastembed_cache \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 253768f..2391d30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -114,6 +114,42 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "async-nats" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a798aab0c0203b31d67d501e5ed1f3ac6c36a329899ce47fc93c3bea53f3ae89" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures", + "memchr", + "nkeys", + "nuid", + "once_cell", + "pin-project", + "portable-atomic", + "rand 0.8.5", + "regex", + "ring", + "rustls-native-certs 0.7.3", + "rustls-pemfile", + "rustls-webpki 0.102.8", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror 1.0.69", + "time", + "tokio", + "tokio-rustls", + "tokio-util", + "tokio-websockets", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-nats" version = "0.45.0" @@ -2135,6 +2171,7 @@ name = "notes-api" version = "0.1.0" dependencies = [ "anyhow", + "async-nats 0.39.0", "async-trait", "axum 0.8.8", "axum-login", @@ -2197,7 +2234,7 @@ name = "notes-worker" version = "0.1.0" dependencies = [ "anyhow", - "async-nats", + "async-nats 0.45.0", "async-trait", "bytes", "chrono", diff --git a/compose.yml b/compose.yml index a327e56..48a543a 100644 --- a/compose.yml +++ b/compose.yml @@ -26,13 +26,23 @@ services: nats: image: nats:alpine - container_name: libertas_nats + container_name: k_notes_nats ports: - "4222:4222" - "6222:6222" - "8222:8222" restart: unless-stopped + qdrant: + image: qdrant/qdrant:latest + container_name: k_notes_qdrant + ports: + - "6333:6333" + - "6334:6334" + volumes: + - ./data/qdrant_storage:/qdrant/storage:z + restart: unless-stopped + # Optional: Define volumes explicitly if needed # volumes: # backend_data: diff --git a/notes-api/Cargo.toml b/notes-api/Cargo.toml index 15c2855..7891932 100644 --- a/notes-api/Cargo.toml +++ b/notes-api/Cargo.toml @@ -20,6 +20,7 @@ tower-sessions-sqlx-store = { version = "0.15", features = ["sqlite"] } password-auth = "1.0" time = "0.3" async-trait = "0.1.89" +async-nats = "0.39" # Async runtime tokio = { version = "1.48.0", features = ["full"] } diff --git a/notes-api/src/config.rs b/notes-api/src/config.rs index b753d4f..d01fe64 100644 --- a/notes-api/src/config.rs +++ b/notes-api/src/config.rs @@ -12,6 +12,7 @@ pub struct Config { pub allow_registration: bool, pub embedding_provider: EmbeddingProvider, pub vector_provider: VectorProvider, + pub broker_url: String, } impl Default for Config { @@ -29,6 +30,7 @@ impl Default for Config { url: "http://localhost:6334".to_string(), collection: "notes".to_string(), }, + broker_url: "nats://localhost:4222".to_string(), } } } @@ -77,6 +79,9 @@ impl Config { }, }; + let broker_url = + env::var("BROKER_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string()); + Self { host, port, @@ -86,6 +91,7 @@ impl Config { allow_registration, embedding_provider, vector_provider, + broker_url, } } } diff --git a/notes-api/src/main.rs b/notes-api/src/main.rs index 86f7a0a..f175b62 100644 --- a/notes-api/src/main.rs +++ b/notes-api/src/main.rs @@ -80,6 +80,12 @@ async fn main() -> anyhow::Result<()> { let tag_service = Arc::new(TagService::new(tag_repo.clone())); let user_service = Arc::new(UserService::new(user_repo.clone())); + // Connect to NATS + tracing::info!("Connecting to NATS: {}", config.broker_url); + let nats_client = async_nats::connect(&config.broker_url) + .await + .map_err(|e| anyhow::anyhow!("NATS connection failed: {}", e))?; + // Create application state let state = AppState::new( note_repo, @@ -88,6 +94,7 @@ async fn main() -> anyhow::Result<()> { note_service, tag_service, user_service, + nats_client, config.clone(), ); diff --git a/notes-api/src/routes/notes.rs b/notes-api/src/routes/notes.rs index bbb4986..541bc1b 100644 --- a/notes-api/src/routes/notes.rs +++ b/notes-api/src/routes/notes.rs @@ -82,6 +82,18 @@ pub async fn create_note( let note = state.note_service.create_note(domain_req).await?; + // Publish event + let payload = serde_json::to_vec(¬e).unwrap_or_default(); + if let Err(e) = state + .nats_client + .publish("notes.updated", payload.into()) + .await + { + tracing::error!("Failed to publish notes.updated event: {}", e); + } else { + tracing::info!("Published notes.updated event for note {}", note.id); + } + Ok((StatusCode::CREATED, Json(NoteResponse::from(note)))) } @@ -137,6 +149,18 @@ pub async fn update_note( let note = state.note_service.update_note(domain_req).await?; + // Publish event + let payload = serde_json::to_vec(¬e).unwrap_or_default(); + if let Err(e) = state + .nats_client + .publish("notes.updated", payload.into()) + .await + { + tracing::error!("Failed to publish notes.updated event: {}", e); + } else { + tracing::info!("Published notes.updated event for note {}", note.id); + } + Ok(Json(NoteResponse::from(note))) } diff --git a/notes-api/src/state.rs b/notes-api/src/state.rs index 2fea070..889193c 100644 --- a/notes-api/src/state.rs +++ b/notes-api/src/state.rs @@ -14,6 +14,7 @@ pub struct AppState { pub note_service: Arc, pub tag_service: Arc, pub user_service: Arc, + pub nats_client: async_nats::Client, pub config: Config, } @@ -25,6 +26,7 @@ impl AppState { note_service: Arc, tag_service: Arc, user_service: Arc, + nats_client: async_nats::Client, config: Config, ) -> Self { Self { @@ -34,6 +36,7 @@ impl AppState { note_service, tag_service, user_service, + nats_client, config, } } diff --git a/notes-infra/src/factory.rs b/notes-infra/src/factory.rs index 7956a08..ab31777 100644 --- a/notes-infra/src/factory.rs +++ b/notes-infra/src/factory.rs @@ -47,6 +47,7 @@ pub async fn build_vector_store( match provider { VectorProvider::Qdrant { url, collection } => { let adapter = crate::vector::qdrant::QdrantVectorAdapter::new(url, collection)?; + adapter.create_collection_if_not_exists().await?; Ok(Arc::new(adapter)) } } diff --git a/notes-infra/src/vector/qdrant.rs b/notes-infra/src/vector/qdrant.rs index 14838c0..dd20fa0 100644 --- a/notes-infra/src/vector/qdrant.rs +++ b/notes-infra/src/vector/qdrant.rs @@ -2,7 +2,10 @@ use async_trait::async_trait; use notes_domain::errors::{DomainError, DomainResult}; use notes_domain::ports::VectorStore; use qdrant_client::Qdrant; -use qdrant_client::qdrant::{PointStruct, SearchPointsBuilder, UpsertPointsBuilder, Value}; +use qdrant_client::qdrant::{ + CreateCollectionBuilder, Distance, PointStruct, SearchPointsBuilder, UpsertPointsBuilder, + Value, VectorParamsBuilder, +}; use std::collections::HashMap; use std::sync::Arc; use uuid::Uuid; @@ -23,6 +26,31 @@ impl QdrantVectorAdapter { collection_name: collection_name.to_string(), }) } + + pub async fn create_collection_if_not_exists(&self) -> DomainResult<()> { + if !self + .client + .collection_exists(&self.collection_name) + .await + .map_err(|e| { + DomainError::InfrastructureError(format!( + "Failed to check collection existence: {}", + e + )) + })? + { + self.client + .create_collection( + CreateCollectionBuilder::new(self.collection_name.clone()) + .vectors_config(VectorParamsBuilder::new(384, Distance::Cosine)), + ) + .await + .map_err(|e| { + DomainError::InfrastructureError(format!("Failed to create collection: {}", e)) + })?; + } + Ok(()) + } } #[async_trait] diff --git a/notes-worker/src/main.rs b/notes-worker/src/main.rs index 1a2100b..3b9cabd 100644 --- a/notes-worker/src/main.rs +++ b/notes-worker/src/main.rs @@ -1,3 +1,4 @@ +use futures_util::StreamExt; use notes_domain::services::SmartNoteService; use notes_infra::{ DatabaseConfig, @@ -10,8 +11,18 @@ use crate::config::Config; mod config; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + #[tokio::main] async fn main() -> anyhow::Result<()> { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "notes_worker=info,notes_infra=info".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + let config = Config::from_env(); let nats_client = async_nats::connect(&config.broker_url).await?; let db_config = DatabaseConfig::new(config.database_url.clone()); @@ -29,8 +40,27 @@ async fn main() -> anyhow::Result<()> { config.embedding_provider ); - // subscribe to jobs and process them - // (Future: consume NATS messages and call smart_service.process_note(¬e)) + // Subscribe to note update events + let mut subscriber = nats_client.subscribe("notes.updated").await?; + tracing::info!("Worker listening on 'notes.updated'..."); + + while let Some(msg) = subscriber.next().await { + // Parse message payload (assuming the payload IS the Note JSON) + let note_result: Result = serde_json::from_slice(&msg.payload); + + match note_result { + Ok(note) => { + tracing::info!("Processing smart features for note: {}", note.id); + match smart_service.process_note(¬e).await { + Ok(_) => tracing::info!("Successfully processed note {}", note.id), + Err(e) => tracing::error!("Failed to process note {}: {}", note.id, e), + } + } + Err(e) => { + tracing::error!("Failed to deserialize note from message: {}", e); + } + } + } Ok(()) }