First batch of smart stuff
This commit is contained in:
@@ -3,11 +3,17 @@ name = "notes-worker"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[features]
|
||||
default = ["sqlite", "smart-features"]
|
||||
sqlite = ["notes-infra/sqlite", "sqlx/sqlite"]
|
||||
# postgres = ["notes-infra/postgres", "sqlx/postgres"]
|
||||
smart-features = ["notes-infra/smart-features"]
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.100"
|
||||
async-nats = "0.45.0"
|
||||
notes-domain = { path = "../notes-domain" }
|
||||
notes-infra = { path = "../notes-infra" }
|
||||
notes-infra = { path = "../notes-infra", default-features = false }
|
||||
serde = { version = "1.0.228", features = ["derive"] }
|
||||
serde_json = "1.0.146"
|
||||
tokio = { version = "1.48.0", features = ["full"] }
|
||||
|
||||
@@ -1,7 +1,14 @@
|
||||
#[cfg(feature = "smart-features")]
|
||||
use notes_infra::factory::{EmbeddingProvider, VectorProvider};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Config {
|
||||
pub broker_url: String,
|
||||
pub database_url: String,
|
||||
#[cfg(feature = "smart-features")]
|
||||
pub embedding_provider: EmbeddingProvider,
|
||||
#[cfg(feature = "smart-features")]
|
||||
pub vector_provider: VectorProvider,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
@@ -9,6 +16,13 @@ impl Default for Config {
|
||||
Self {
|
||||
broker_url: "nats://localhost:4222".to_string(),
|
||||
database_url: "sqlite::memory:".to_string(),
|
||||
#[cfg(feature = "smart-features")]
|
||||
embedding_provider: EmbeddingProvider::FastEmbed,
|
||||
#[cfg(feature = "smart-features")]
|
||||
vector_provider: VectorProvider::Qdrant {
|
||||
url: "http://localhost:6334".to_string(),
|
||||
collection: "notes".to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -17,9 +31,34 @@ impl Config {
|
||||
pub fn from_env() -> Self {
|
||||
let _ = dotenvy::dotenv();
|
||||
|
||||
#[cfg(feature = "smart-features")]
|
||||
let embedding_provider = match std::env::var("EMBEDDING_PROVIDER")
|
||||
.unwrap_or_default()
|
||||
.as_str()
|
||||
{
|
||||
_ => EmbeddingProvider::FastEmbed,
|
||||
};
|
||||
|
||||
#[cfg(feature = "smart-features")]
|
||||
let vector_provider = match std::env::var("VECTOR_PROVIDER")
|
||||
.unwrap_or_default()
|
||||
.as_str()
|
||||
{
|
||||
_ => VectorProvider::Qdrant {
|
||||
url: std::env::var("QDRANT_URL")
|
||||
.unwrap_or_else(|_| "http://localhost:6334".to_string()),
|
||||
collection: std::env::var("QDRANT_COLLECTION")
|
||||
.unwrap_or_else(|_| "notes".to_string()),
|
||||
},
|
||||
};
|
||||
|
||||
Self {
|
||||
broker_url: std::env::var("BROKER_URL").unwrap_or("nats://localhost:4222".to_string()),
|
||||
database_url: std::env::var("DATABASE_URL").unwrap_or("sqlite::memory:".to_string()),
|
||||
#[cfg(feature = "smart-features")]
|
||||
embedding_provider,
|
||||
#[cfg(feature = "smart-features")]
|
||||
vector_provider,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,16 +1,77 @@
|
||||
use notes_infra::{DatabaseConfig, create_pool};
|
||||
use futures_util::StreamExt;
|
||||
#[cfg(feature = "smart-features")]
|
||||
use notes_domain::services::SmartNoteService;
|
||||
#[cfg(feature = "smart-features")]
|
||||
use notes_infra::{
|
||||
DatabaseConfig,
|
||||
factory::{
|
||||
build_database_pool, build_embedding_generator, build_link_repository, build_vector_store,
|
||||
},
|
||||
};
|
||||
|
||||
use crate::config::Config;
|
||||
|
||||
mod config;
|
||||
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let config = Config::from_env();
|
||||
let nats_client = async_nats::connect(config.broker_url).await?;
|
||||
let db_config = DatabaseConfig::new(config.database_url);
|
||||
let db_pool = create_pool(&db_config).await?;
|
||||
tracing_subscriber::registry()
|
||||
.with(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| "notes_worker=info,notes_infra=info".into()),
|
||||
)
|
||||
.with(tracing_subscriber::fmt::layer())
|
||||
.init();
|
||||
|
||||
let config = Config::from_env();
|
||||
let nats_client = async_nats::connect(&config.broker_url).await?;
|
||||
|
||||
#[cfg(feature = "smart-features")]
|
||||
{
|
||||
let db_config = DatabaseConfig::new(config.database_url.clone());
|
||||
let db_pool = build_database_pool(&db_config).await?;
|
||||
|
||||
// Initialize smart feature adapters
|
||||
let embedding_generator = build_embedding_generator(&config.embedding_provider).await?;
|
||||
let vector_store = build_vector_store(&config.vector_provider).await?;
|
||||
let link_repo = build_link_repository(&db_pool).await?;
|
||||
|
||||
// Create the service
|
||||
let smart_service = SmartNoteService::new(embedding_generator, vector_store, link_repo);
|
||||
tracing::info!(
|
||||
"SmartNoteService initialized successfully with {:?}",
|
||||
config.embedding_provider
|
||||
);
|
||||
|
||||
// Subscribe to note update events
|
||||
let mut subscriber = nats_client.subscribe("notes.updated").await?;
|
||||
tracing::info!("Worker listening on 'notes.updated'...");
|
||||
|
||||
while let Some(msg) = subscriber.next().await {
|
||||
// Parse message payload (assuming the payload IS the Note JSON)
|
||||
let note_result: Result<notes_domain::Note, _> = serde_json::from_slice(&msg.payload);
|
||||
|
||||
match note_result {
|
||||
Ok(note) => {
|
||||
tracing::info!("Processing smart features for note: {}", note.id);
|
||||
match smart_service.process_note(¬e).await {
|
||||
Ok(_) => tracing::info!("Successfully processed note {}", note.id),
|
||||
Err(e) => tracing::error!("Failed to process note {}: {}", note.id, e),
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to deserialize note from message: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "smart-features"))]
|
||||
{
|
||||
tracing::info!("Smart features are disabled. Worker will exit.");
|
||||
}
|
||||
|
||||
// subscribe to jobs and process them
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user