feat: Implement NATS-based event processing for note smart features and add Qdrant collection auto-creation.
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -5,3 +5,5 @@
|
|||||||
*.db-wal
|
*.db-wal
|
||||||
.DS_Store
|
.DS_Store
|
||||||
*/.DS_Store
|
*/.DS_Store
|
||||||
|
/data
|
||||||
|
/.fastembed_cache
|
||||||
39
Cargo.lock
generated
39
Cargo.lock
generated
@@ -114,6 +114,42 @@ dependencies = [
|
|||||||
"stable_deref_trait",
|
"stable_deref_trait",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-nats"
|
||||||
|
version = "0.39.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a798aab0c0203b31d67d501e5ed1f3ac6c36a329899ce47fc93c3bea53f3ae89"
|
||||||
|
dependencies = [
|
||||||
|
"base64 0.22.1",
|
||||||
|
"bytes",
|
||||||
|
"futures",
|
||||||
|
"memchr",
|
||||||
|
"nkeys",
|
||||||
|
"nuid",
|
||||||
|
"once_cell",
|
||||||
|
"pin-project",
|
||||||
|
"portable-atomic",
|
||||||
|
"rand 0.8.5",
|
||||||
|
"regex",
|
||||||
|
"ring",
|
||||||
|
"rustls-native-certs 0.7.3",
|
||||||
|
"rustls-pemfile",
|
||||||
|
"rustls-webpki 0.102.8",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"serde_nanos",
|
||||||
|
"serde_repr",
|
||||||
|
"thiserror 1.0.69",
|
||||||
|
"time",
|
||||||
|
"tokio",
|
||||||
|
"tokio-rustls",
|
||||||
|
"tokio-util",
|
||||||
|
"tokio-websockets",
|
||||||
|
"tracing",
|
||||||
|
"tryhard",
|
||||||
|
"url",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-nats"
|
name = "async-nats"
|
||||||
version = "0.45.0"
|
version = "0.45.0"
|
||||||
@@ -2135,6 +2171,7 @@ name = "notes-api"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
"async-nats 0.39.0",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"axum 0.8.8",
|
"axum 0.8.8",
|
||||||
"axum-login",
|
"axum-login",
|
||||||
@@ -2197,7 +2234,7 @@ name = "notes-worker"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-nats",
|
"async-nats 0.45.0",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"bytes",
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
|||||||
12
compose.yml
12
compose.yml
@@ -26,13 +26,23 @@ services:
|
|||||||
|
|
||||||
nats:
|
nats:
|
||||||
image: nats:alpine
|
image: nats:alpine
|
||||||
container_name: libertas_nats
|
container_name: k_notes_nats
|
||||||
ports:
|
ports:
|
||||||
- "4222:4222"
|
- "4222:4222"
|
||||||
- "6222:6222"
|
- "6222:6222"
|
||||||
- "8222:8222"
|
- "8222:8222"
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
|
||||||
|
qdrant:
|
||||||
|
image: qdrant/qdrant:latest
|
||||||
|
container_name: k_notes_qdrant
|
||||||
|
ports:
|
||||||
|
- "6333:6333"
|
||||||
|
- "6334:6334"
|
||||||
|
volumes:
|
||||||
|
- ./data/qdrant_storage:/qdrant/storage:z
|
||||||
|
restart: unless-stopped
|
||||||
|
|
||||||
# Optional: Define volumes explicitly if needed
|
# Optional: Define volumes explicitly if needed
|
||||||
# volumes:
|
# volumes:
|
||||||
# backend_data:
|
# backend_data:
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ tower-sessions-sqlx-store = { version = "0.15", features = ["sqlite"] }
|
|||||||
password-auth = "1.0"
|
password-auth = "1.0"
|
||||||
time = "0.3"
|
time = "0.3"
|
||||||
async-trait = "0.1.89"
|
async-trait = "0.1.89"
|
||||||
|
async-nats = "0.39"
|
||||||
|
|
||||||
# Async runtime
|
# Async runtime
|
||||||
tokio = { version = "1.48.0", features = ["full"] }
|
tokio = { version = "1.48.0", features = ["full"] }
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ pub struct Config {
|
|||||||
pub allow_registration: bool,
|
pub allow_registration: bool,
|
||||||
pub embedding_provider: EmbeddingProvider,
|
pub embedding_provider: EmbeddingProvider,
|
||||||
pub vector_provider: VectorProvider,
|
pub vector_provider: VectorProvider,
|
||||||
|
pub broker_url: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
@@ -29,6 +30,7 @@ impl Default for Config {
|
|||||||
url: "http://localhost:6334".to_string(),
|
url: "http://localhost:6334".to_string(),
|
||||||
collection: "notes".to_string(),
|
collection: "notes".to_string(),
|
||||||
},
|
},
|
||||||
|
broker_url: "nats://localhost:4222".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -77,6 +79,9 @@ impl Config {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let broker_url =
|
||||||
|
env::var("BROKER_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
host,
|
host,
|
||||||
port,
|
port,
|
||||||
@@ -86,6 +91,7 @@ impl Config {
|
|||||||
allow_registration,
|
allow_registration,
|
||||||
embedding_provider,
|
embedding_provider,
|
||||||
vector_provider,
|
vector_provider,
|
||||||
|
broker_url,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -80,6 +80,12 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let tag_service = Arc::new(TagService::new(tag_repo.clone()));
|
let tag_service = Arc::new(TagService::new(tag_repo.clone()));
|
||||||
let user_service = Arc::new(UserService::new(user_repo.clone()));
|
let user_service = Arc::new(UserService::new(user_repo.clone()));
|
||||||
|
|
||||||
|
// Connect to NATS
|
||||||
|
tracing::info!("Connecting to NATS: {}", config.broker_url);
|
||||||
|
let nats_client = async_nats::connect(&config.broker_url)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("NATS connection failed: {}", e))?;
|
||||||
|
|
||||||
// Create application state
|
// Create application state
|
||||||
let state = AppState::new(
|
let state = AppState::new(
|
||||||
note_repo,
|
note_repo,
|
||||||
@@ -88,6 +94,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
note_service,
|
note_service,
|
||||||
tag_service,
|
tag_service,
|
||||||
user_service,
|
user_service,
|
||||||
|
nats_client,
|
||||||
config.clone(),
|
config.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -82,6 +82,18 @@ pub async fn create_note(
|
|||||||
|
|
||||||
let note = state.note_service.create_note(domain_req).await?;
|
let note = state.note_service.create_note(domain_req).await?;
|
||||||
|
|
||||||
|
// Publish event
|
||||||
|
let payload = serde_json::to_vec(¬e).unwrap_or_default();
|
||||||
|
if let Err(e) = state
|
||||||
|
.nats_client
|
||||||
|
.publish("notes.updated", payload.into())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!("Failed to publish notes.updated event: {}", e);
|
||||||
|
} else {
|
||||||
|
tracing::info!("Published notes.updated event for note {}", note.id);
|
||||||
|
}
|
||||||
|
|
||||||
Ok((StatusCode::CREATED, Json(NoteResponse::from(note))))
|
Ok((StatusCode::CREATED, Json(NoteResponse::from(note))))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,6 +149,18 @@ pub async fn update_note(
|
|||||||
|
|
||||||
let note = state.note_service.update_note(domain_req).await?;
|
let note = state.note_service.update_note(domain_req).await?;
|
||||||
|
|
||||||
|
// Publish event
|
||||||
|
let payload = serde_json::to_vec(¬e).unwrap_or_default();
|
||||||
|
if let Err(e) = state
|
||||||
|
.nats_client
|
||||||
|
.publish("notes.updated", payload.into())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!("Failed to publish notes.updated event: {}", e);
|
||||||
|
} else {
|
||||||
|
tracing::info!("Published notes.updated event for note {}", note.id);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(Json(NoteResponse::from(note)))
|
Ok(Json(NoteResponse::from(note)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ pub struct AppState {
|
|||||||
pub note_service: Arc<NoteService>,
|
pub note_service: Arc<NoteService>,
|
||||||
pub tag_service: Arc<TagService>,
|
pub tag_service: Arc<TagService>,
|
||||||
pub user_service: Arc<UserService>,
|
pub user_service: Arc<UserService>,
|
||||||
|
pub nats_client: async_nats::Client,
|
||||||
pub config: Config,
|
pub config: Config,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -25,6 +26,7 @@ impl AppState {
|
|||||||
note_service: Arc<NoteService>,
|
note_service: Arc<NoteService>,
|
||||||
tag_service: Arc<TagService>,
|
tag_service: Arc<TagService>,
|
||||||
user_service: Arc<UserService>,
|
user_service: Arc<UserService>,
|
||||||
|
nats_client: async_nats::Client,
|
||||||
config: Config,
|
config: Config,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -34,6 +36,7 @@ impl AppState {
|
|||||||
note_service,
|
note_service,
|
||||||
tag_service,
|
tag_service,
|
||||||
user_service,
|
user_service,
|
||||||
|
nats_client,
|
||||||
config,
|
config,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ pub async fn build_vector_store(
|
|||||||
match provider {
|
match provider {
|
||||||
VectorProvider::Qdrant { url, collection } => {
|
VectorProvider::Qdrant { url, collection } => {
|
||||||
let adapter = crate::vector::qdrant::QdrantVectorAdapter::new(url, collection)?;
|
let adapter = crate::vector::qdrant::QdrantVectorAdapter::new(url, collection)?;
|
||||||
|
adapter.create_collection_if_not_exists().await?;
|
||||||
Ok(Arc::new(adapter))
|
Ok(Arc::new(adapter))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,10 @@ use async_trait::async_trait;
|
|||||||
use notes_domain::errors::{DomainError, DomainResult};
|
use notes_domain::errors::{DomainError, DomainResult};
|
||||||
use notes_domain::ports::VectorStore;
|
use notes_domain::ports::VectorStore;
|
||||||
use qdrant_client::Qdrant;
|
use qdrant_client::Qdrant;
|
||||||
use qdrant_client::qdrant::{PointStruct, SearchPointsBuilder, UpsertPointsBuilder, Value};
|
use qdrant_client::qdrant::{
|
||||||
|
CreateCollectionBuilder, Distance, PointStruct, SearchPointsBuilder, UpsertPointsBuilder,
|
||||||
|
Value, VectorParamsBuilder,
|
||||||
|
};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
@@ -23,6 +26,31 @@ impl QdrantVectorAdapter {
|
|||||||
collection_name: collection_name.to_string(),
|
collection_name: collection_name.to_string(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn create_collection_if_not_exists(&self) -> DomainResult<()> {
|
||||||
|
if !self
|
||||||
|
.client
|
||||||
|
.collection_exists(&self.collection_name)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
DomainError::InfrastructureError(format!(
|
||||||
|
"Failed to check collection existence: {}",
|
||||||
|
e
|
||||||
|
))
|
||||||
|
})?
|
||||||
|
{
|
||||||
|
self.client
|
||||||
|
.create_collection(
|
||||||
|
CreateCollectionBuilder::new(self.collection_name.clone())
|
||||||
|
.vectors_config(VectorParamsBuilder::new(384, Distance::Cosine)),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
DomainError::InfrastructureError(format!("Failed to create collection: {}", e))
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use futures_util::StreamExt;
|
||||||
use notes_domain::services::SmartNoteService;
|
use notes_domain::services::SmartNoteService;
|
||||||
use notes_infra::{
|
use notes_infra::{
|
||||||
DatabaseConfig,
|
DatabaseConfig,
|
||||||
@@ -10,8 +11,18 @@ use crate::config::Config;
|
|||||||
|
|
||||||
mod config;
|
mod config;
|
||||||
|
|
||||||
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
tracing_subscriber::registry()
|
||||||
|
.with(
|
||||||
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||||
|
.unwrap_or_else(|_| "notes_worker=info,notes_infra=info".into()),
|
||||||
|
)
|
||||||
|
.with(tracing_subscriber::fmt::layer())
|
||||||
|
.init();
|
||||||
|
|
||||||
let config = Config::from_env();
|
let config = Config::from_env();
|
||||||
let nats_client = async_nats::connect(&config.broker_url).await?;
|
let nats_client = async_nats::connect(&config.broker_url).await?;
|
||||||
let db_config = DatabaseConfig::new(config.database_url.clone());
|
let db_config = DatabaseConfig::new(config.database_url.clone());
|
||||||
@@ -29,8 +40,27 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
config.embedding_provider
|
config.embedding_provider
|
||||||
);
|
);
|
||||||
|
|
||||||
// subscribe to jobs and process them
|
// Subscribe to note update events
|
||||||
// (Future: consume NATS messages and call smart_service.process_note(¬e))
|
let mut subscriber = nats_client.subscribe("notes.updated").await?;
|
||||||
|
tracing::info!("Worker listening on 'notes.updated'...");
|
||||||
|
|
||||||
|
while let Some(msg) = subscriber.next().await {
|
||||||
|
// Parse message payload (assuming the payload IS the Note JSON)
|
||||||
|
let note_result: Result<notes_domain::Note, _> = serde_json::from_slice(&msg.payload);
|
||||||
|
|
||||||
|
match note_result {
|
||||||
|
Ok(note) => {
|
||||||
|
tracing::info!("Processing smart features for note: {}", note.id);
|
||||||
|
match smart_service.process_note(¬e).await {
|
||||||
|
Ok(_) => tracing::info!("Successfully processed note {}", note.id),
|
||||||
|
Err(e) => tracing::error!("Failed to process note {}: {}", note.id, e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to deserialize note from message: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user