From 1bacb454fad740b48ce5fcad99bcbdd5fd3af9ab Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 26 Dec 2025 00:32:20 +0000 Subject: [PATCH] Refactor (#4) Reviewed-on: https://git.gabrielkaszewski.dev/GKaszewski/k-notes/pulls/4 --- Cargo.lock | 1 + Dockerfile | 3 +- compose.yml | 15 +++ notes-api/src/main.rs | 26 +++-- notes-api/src/nats_broker.rs | 31 +++++ notes-api/src/routes/notes.rs | 30 +---- notes-domain/Cargo.toml | 1 + notes-domain/src/lib.rs | 1 + notes-domain/src/ports.rs | 11 +- notes-domain/src/services.rs | 54 ++++++++- notes-infra/src/note_repository.rs | 180 +++++++++++++++++------------ 11 files changed, 238 insertions(+), 115 deletions(-) create mode 100644 notes-api/src/nats_broker.rs diff --git a/Cargo.lock b/Cargo.lock index 2391d30..f3039ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2207,6 +2207,7 @@ dependencies = [ "serde_json", "thiserror 2.0.17", "tokio", + "tracing", "uuid", ] diff --git a/Dockerfile b/Dockerfile index 20a36be..6756aa9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ WORKDIR /app COPY . . # Build the release binary -RUN cargo build --release -p notes-api +RUN cargo build --release -p notes-api -p notes-worker FROM debian:bookworm-slim @@ -14,6 +14,7 @@ WORKDIR /app RUN apt-get update && apt-get install -y libssl3 ca-certificates && rm -rf /var/lib/apt/lists/* COPY --from=builder /app/target/release/notes-api . +COPY --from=builder /app/target/release/notes-worker . # Create data directory for SQLite diff --git a/compose.yml b/compose.yml index 48a543a..f7bcd7f 100644 --- a/compose.yml +++ b/compose.yml @@ -14,6 +14,21 @@ services: volumes: - ./data:/app/data + worker: + build: . + command: ["./notes-worker"] + environment: + - DATABASE_URL=sqlite:///app/data/notes.db + - BROKER_URL=nats://nats:4222 + - QDRANT_URL=http://qdrant:6334 + - EMBEDDING_PROVIDER=fastembed + depends_on: + - backend + - nats + - qdrant + volumes: + - ./data:/app/data + frontend: build: ./k-notes-frontend ports: diff --git a/notes-api/src/main.rs b/notes-api/src/main.rs index f36598c..dde665d 100644 --- a/notes-api/src/main.rs +++ b/notes-api/src/main.rs @@ -18,6 +18,8 @@ mod auth; mod config; mod dto; mod error; +#[cfg(feature = "smart-features")] +mod nats_broker; mod routes; mod state; @@ -79,14 +81,7 @@ async fn main() -> anyhow::Result<()> { .await .map_err(|e| anyhow::anyhow!(e))?; - // Create services - use notes_domain::{NoteService, TagService, UserService}; - let note_service = Arc::new(NoteService::new(note_repo.clone(), tag_repo.clone())); - let tag_service = Arc::new(TagService::new(tag_repo.clone())); - let user_service = Arc::new(UserService::new(user_repo.clone())); - - // Connect to NATS - // Connect to NATS + // Connect to NATS (before creating services that depend on it) #[cfg(feature = "smart-features")] let nats_client = { tracing::info!("Connecting to NATS: {}", config.broker_url); @@ -95,6 +90,21 @@ async fn main() -> anyhow::Result<()> { .map_err(|e| anyhow::anyhow!("NATS connection failed: {}", e))? }; + // Create services + use notes_domain::{NoteService, TagService, UserService}; + + // Build NoteService with optional MessageBroker + #[cfg(feature = "smart-features")] + let note_service = { + let broker = Arc::new(nats_broker::NatsMessageBroker::new(nats_client.clone())); + Arc::new(NoteService::new(note_repo.clone(), tag_repo.clone()).with_message_broker(broker)) + }; + #[cfg(not(feature = "smart-features"))] + let note_service = Arc::new(NoteService::new(note_repo.clone(), tag_repo.clone())); + + let tag_service = Arc::new(TagService::new(tag_repo.clone())); + let user_service = Arc::new(UserService::new(user_repo.clone())); + // Create application state let state = AppState::new( note_repo, diff --git a/notes-api/src/nats_broker.rs b/notes-api/src/nats_broker.rs new file mode 100644 index 0000000..431bf34 --- /dev/null +++ b/notes-api/src/nats_broker.rs @@ -0,0 +1,31 @@ +//! NATS message broker adapter for domain MessageBroker port + +use async_trait::async_trait; +use notes_domain::{DomainError, DomainResult, MessageBroker, Note}; + +/// NATS adapter implementing the MessageBroker port +pub struct NatsMessageBroker { + client: async_nats::Client, +} + +impl NatsMessageBroker { + pub fn new(client: async_nats::Client) -> Self { + Self { client } + } +} + +#[async_trait] +impl MessageBroker for NatsMessageBroker { + async fn publish_note_updated(&self, note: &Note) -> DomainResult<()> { + let payload = serde_json::to_vec(note).map_err(|e| { + DomainError::RepositoryError(format!("Failed to serialize note: {}", e)) + })?; + + self.client + .publish("notes.updated", payload.into()) + .await + .map_err(|e| DomainError::RepositoryError(format!("Failed to publish event: {}", e)))?; + + Ok(()) + } +} diff --git a/notes-api/src/routes/notes.rs b/notes-api/src/routes/notes.rs index ed6f238..8479ebe 100644 --- a/notes-api/src/routes/notes.rs +++ b/notes-api/src/routes/notes.rs @@ -82,20 +82,7 @@ pub async fn create_note( let note = state.note_service.create_note(domain_req).await?; - // Publish event - #[cfg(feature = "smart-features")] - { - let payload = serde_json::to_vec(¬e).unwrap_or_default(); - if let Err(e) = state - .nats_client - .publish("notes.updated", payload.into()) - .await - { - tracing::error!("Failed to publish notes.updated event: {}", e); - } else { - tracing::info!("Published notes.updated event for note {}", note.id); - } - } + // Event publishing is now handled in NoteService via MessageBroker Ok((StatusCode::CREATED, Json(NoteResponse::from(note)))) } @@ -152,20 +139,7 @@ pub async fn update_note( let note = state.note_service.update_note(domain_req).await?; - // Publish event - #[cfg(feature = "smart-features")] - { - let payload = serde_json::to_vec(¬e).unwrap_or_default(); - if let Err(e) = state - .nats_client - .publish("notes.updated", payload.into()) - .await - { - tracing::error!("Failed to publish notes.updated event: {}", e); - } else { - tracing::info!("Published notes.updated event for note {}", note.id); - } - } + // Event publishing is now handled in NoteService via MessageBroker Ok(Json(NoteResponse::from(note))) } diff --git a/notes-domain/Cargo.toml b/notes-domain/Cargo.toml index 6894357..a51a4cc 100644 --- a/notes-domain/Cargo.toml +++ b/notes-domain/Cargo.toml @@ -10,6 +10,7 @@ chrono = { version = "0.4.42", features = ["serde"] } serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.146" thiserror = "2.0.17" +tracing = "0.1" uuid = { version = "1.19.0", features = ["v4", "serde"] } [dev-dependencies] diff --git a/notes-domain/src/lib.rs b/notes-domain/src/lib.rs index ca41f97..d4b8b7a 100644 --- a/notes-domain/src/lib.rs +++ b/notes-domain/src/lib.rs @@ -17,5 +17,6 @@ pub mod services; // Re-export commonly used types at crate root pub use entities::{MAX_TAGS_PER_NOTE, Note, NoteFilter, NoteVersion, Tag, User}; pub use errors::{DomainError, DomainResult}; +pub use ports::MessageBroker; pub use repositories::{NoteRepository, TagRepository, UserRepository}; pub use services::{CreateNoteRequest, NoteService, TagService, UpdateNoteRequest, UserService}; diff --git a/notes-domain/src/ports.rs b/notes-domain/src/ports.rs index 35f24bd..11a7c51 100644 --- a/notes-domain/src/ports.rs +++ b/notes-domain/src/ports.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use uuid::Uuid; -use crate::entities::NoteLink; +use crate::entities::{Note, NoteLink}; use crate::errors::DomainResult; /// Defines how to generate vector embeddings from text. @@ -34,3 +34,12 @@ pub trait LinkRepository: Send + Sync { /// Get links for a specific source note. async fn get_links_for_note(&self, source_note_id: Uuid) -> DomainResult>; } + +/// Port for publishing domain events to a message broker. +/// Enables the Service layer to trigger background processing +/// without coupling to a specific messaging implementation. +#[async_trait] +pub trait MessageBroker: Send + Sync { + /// Publish an event when a note is created or updated. + async fn publish_note_updated(&self, note: &Note) -> DomainResult<()>; +} diff --git a/notes-domain/src/services.rs b/notes-domain/src/services.rs index a60cde6..e5c0384 100644 --- a/notes-domain/src/services.rs +++ b/notes-domain/src/services.rs @@ -8,6 +8,7 @@ use uuid::Uuid; use crate::entities::{MAX_TAGS_PER_NOTE, Note, NoteFilter, NoteVersion, Tag, User}; use crate::errors::{DomainError, DomainResult}; +use crate::ports::MessageBroker; use crate::repositories::{NoteRepository, TagRepository, UserRepository}; /// Request to create a new note @@ -38,6 +39,7 @@ pub struct UpdateNoteRequest { pub struct NoteService { note_repo: Arc, tag_repo: Arc, + message_broker: Option>, } impl NoteService { @@ -45,6 +47,24 @@ impl NoteService { Self { note_repo, tag_repo, + message_broker: None, + } + } + + /// Builder method to set the message broker + pub fn with_message_broker(mut self, broker: Arc) -> Self { + self.message_broker = Some(broker); + self + } + + /// Helper to publish note update events + async fn publish_note_event(&self, note: &Note) { + if let Some(ref broker) = self.message_broker { + if let Err(e) = broker.publish_note_updated(note).await { + tracing::error!(note_id = %note.id, "Failed to publish note event: {}", e); + } else { + tracing::info!(note_id = %note.id, "Published note.updated event"); + } } } @@ -81,6 +101,9 @@ impl NoteService { self.tag_repo.add_to_note(tag.id, note.id).await?; } + // Publish event for smart features processing + self.publish_note_event(¬e).await; + Ok(note) } @@ -149,6 +172,10 @@ impl NoteService { } self.note_repo.save(¬e).await?; + + // Publish event for smart features processing + self.publish_note_event(¬e).await; + Ok(note) } @@ -217,14 +244,31 @@ impl NoteService { } /// Get or create a tag by name + /// + /// Handles race conditions gracefully: if a concurrent request creates + /// the same tag, we catch the unique constraint violation and retry the lookup. async fn get_or_create_tag(&self, user_id: Uuid, name: &str) -> DomainResult { let name = name.trim().to_lowercase(); + + // First, try to find existing tag if let Some(tag) = self.tag_repo.find_by_name(user_id, &name).await? { - Ok(tag) - } else { - let tag = Tag::new(name, user_id); - self.tag_repo.save(&tag).await?; - Ok(tag) + return Ok(tag); + } + + // Tag doesn't exist, try to create it + let tag = Tag::new(name.clone(), user_id); + match self.tag_repo.save(&tag).await { + Ok(()) => Ok(tag), + Err(DomainError::RepositoryError(ref e)) if e.contains("UNIQUE constraint") => { + // Race condition: another request created the tag between our check and save + // Retry the lookup + tracing::debug!(tag_name = %name, "Tag creation race condition detected, retrying lookup"); + self.tag_repo + .find_by_name(user_id, &name) + .await? + .ok_or_else(|| DomainError::validation("Tag creation race condition")) + } + Err(e) => Err(e), } } } diff --git a/notes-infra/src/note_repository.rs b/notes-infra/src/note_repository.rs index 8383b64..ec2578a 100644 --- a/notes-infra/src/note_repository.rs +++ b/notes-infra/src/note_repository.rs @@ -2,30 +2,25 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; -use sqlx::{FromRow, SqlitePool}; +use sqlx::{FromRow, QueryBuilder, Sqlite, SqlitePool}; use uuid::Uuid; -use notes_domain::{ - DomainError, DomainResult, Note, NoteFilter, NoteRepository, NoteVersion, Tag, TagRepository, -}; - -use crate::tag_repository::SqliteTagRepository; +use notes_domain::{DomainError, DomainResult, Note, NoteFilter, NoteRepository, NoteVersion, Tag}; /// SQLite adapter for NoteRepository pub struct SqliteNoteRepository { pool: SqlitePool, - tag_repo: SqliteTagRepository, } impl SqliteNoteRepository { pub fn new(pool: SqlitePool) -> Self { - let tag_repo = SqliteTagRepository::new(pool.clone()); - Self { pool, tag_repo } + Self { pool } } } +/// Row with JSON-aggregated tags for single-query fetching #[derive(Debug, FromRow)] -struct NoteRow { +struct NoteRowWithTags { id: String, user_id: String, title: String, @@ -35,27 +30,59 @@ struct NoteRow { is_archived: i32, created_at: String, updated_at: String, + tags_json: String, } -impl NoteRow { - fn try_into_note(self, tags: Vec) -> Result { +/// Helper to parse datetime strings +fn parse_datetime(s: &str) -> Result, DomainError> { + DateTime::parse_from_rfc3339(s) + .map(|dt| dt.with_timezone(&Utc)) + .or_else(|_| { + chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map(|dt| dt.and_utc()) + }) + .map_err(|e| DomainError::RepositoryError(format!("Invalid datetime: {}", e))) +} + +/// Helper to parse tags from JSON array +fn parse_tags_json(tags_json: &str) -> Result, DomainError> { + // SQLite returns [null] for LEFT JOIN with no matches + let parsed: Vec = serde_json::from_str(tags_json) + .map_err(|e| DomainError::RepositoryError(format!("Failed to parse tags JSON: {}", e)))?; + + parsed + .into_iter() + .filter(|v| !v.is_null()) + .map(|v| { + let id_str = v["id"] + .as_str() + .ok_or_else(|| DomainError::RepositoryError("Missing tag id".to_string()))?; + let name = v["name"] + .as_str() + .ok_or_else(|| DomainError::RepositoryError("Missing tag name".to_string()))?; + let user_id_str = v["user_id"] + .as_str() + .ok_or_else(|| DomainError::RepositoryError("Missing tag user_id".to_string()))?; + + let id = Uuid::parse_str(id_str) + .map_err(|e| DomainError::RepositoryError(format!("Invalid tag UUID: {}", e)))?; + let user_id = Uuid::parse_str(user_id_str) + .map_err(|e| DomainError::RepositoryError(format!("Invalid tag user_id: {}", e)))?; + + Ok(Tag::with_id(id, name.to_string(), user_id)) + }) + .collect() +} + +impl NoteRowWithTags { + fn try_into_note(self) -> Result { let id = Uuid::parse_str(&self.id) .map_err(|e| DomainError::RepositoryError(format!("Invalid UUID: {}", e)))?; let user_id = Uuid::parse_str(&self.user_id) .map_err(|e| DomainError::RepositoryError(format!("Invalid UUID: {}", e)))?; - let parse_datetime = |s: &str| -> Result, DomainError> { - DateTime::parse_from_rfc3339(s) - .map(|dt| dt.with_timezone(&Utc)) - .or_else(|_| { - chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") - .map(|dt| dt.and_utc()) - }) - .map_err(|e| DomainError::RepositoryError(format!("Invalid datetime: {}", e))) - }; - let created_at = parse_datetime(&self.created_at)?; let updated_at = parse_datetime(&self.updated_at)?; + let tags = parse_tags_json(&self.tags_json)?; Ok(Note { id, @@ -110,10 +137,20 @@ impl NoteVersionRow { impl NoteRepository for SqliteNoteRepository { async fn find_by_id(&self, id: Uuid) -> DomainResult> { let id_str = id.to_string(); - let row: Option = sqlx::query_as( + let row: Option = sqlx::query_as( r#" - SELECT id, user_id, title, content, color, is_pinned, is_archived, created_at, updated_at - FROM notes WHERE id = ? + SELECT n.id, n.user_id, n.title, n.content, n.color, n.is_pinned, n.is_archived, + n.created_at, n.updated_at, + json_group_array( + CASE WHEN t.id IS NOT NULL + THEN json_object('id', t.id, 'name', t.name, 'user_id', t.user_id) + ELSE NULL END + ) as tags_json + FROM notes n + LEFT JOIN note_tags nt ON n.id = nt.note_id + LEFT JOIN tags t ON nt.tag_id = t.id + WHERE n.id = ? + GROUP BY n.id "#, ) .bind(&id_str) @@ -122,10 +159,7 @@ impl NoteRepository for SqliteNoteRepository { .map_err(|e| DomainError::RepositoryError(e.to_string()))?; match row { - Some(row) => { - let tags = self.tag_repo.find_by_note(id).await?; - Ok(Some(row.try_into_note(tags)?)) - } + Some(row) => Ok(Some(row.try_into_note()?)), None => Ok(None), } } @@ -133,50 +167,52 @@ impl NoteRepository for SqliteNoteRepository { async fn find_by_user(&self, user_id: Uuid, filter: NoteFilter) -> DomainResult> { let user_id_str = user_id.to_string(); - // Build dynamic query based on filter - let mut query = String::from( + // Build dynamic query using QueryBuilder for safety + let mut query_builder: QueryBuilder = QueryBuilder::new( r#" - SELECT id, user_id, title, content, color, is_pinned, is_archived, created_at, updated_at - FROM notes - WHERE user_id = ? + SELECT n.id, n.user_id, n.title, n.content, n.color, n.is_pinned, n.is_archived, + n.created_at, n.updated_at, + json_group_array( + CASE WHEN t.id IS NOT NULL + THEN json_object('id', t.id, 'name', t.name, 'user_id', t.user_id) + ELSE NULL END + ) as tags_json + FROM notes n + LEFT JOIN note_tags nt ON n.id = nt.note_id + LEFT JOIN tags t ON nt.tag_id = t.id + WHERE n.user_id = "#, ); + query_builder.push_bind(user_id_str); if let Some(pinned) = filter.is_pinned { - query.push_str(&format!(" AND is_pinned = {}", if pinned { 1 } else { 0 })); + query_builder + .push(" AND n.is_pinned = ") + .push_bind(if pinned { 1i32 } else { 0i32 }); } if let Some(archived) = filter.is_archived { - query.push_str(&format!( - " AND is_archived = {}", - if archived { 1 } else { 0 } - )); + query_builder + .push(" AND n.is_archived = ") + .push_bind(if archived { 1i32 } else { 0i32 }); } if let Some(tag_id) = filter.tag_id { - query.push_str(&format!( - " AND id IN (SELECT note_id FROM note_tags WHERE tag_id = '{}')", - tag_id - )); + query_builder + .push(" AND n.id IN (SELECT note_id FROM note_tags WHERE tag_id = ") + .push_bind(tag_id.to_string()) + .push(")"); } - query.push_str(" ORDER BY is_pinned DESC, updated_at DESC"); + query_builder.push(" GROUP BY n.id ORDER BY n.is_pinned DESC, n.updated_at DESC"); - let rows: Vec = sqlx::query_as(&query) - .bind(&user_id_str) + let rows: Vec = query_builder + .build_query_as() .fetch_all(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - let mut notes = Vec::with_capacity(rows.len()); - for row in rows { - let note_id = Uuid::parse_str(&row.id) - .map_err(|e| DomainError::RepositoryError(format!("Invalid UUID: {}", e)))?; - let tags = self.tag_repo.find_by_note(note_id).await?; - notes.push(row.try_into_note(tags)?); - } - - Ok(notes) + rows.into_iter().map(|row| row.try_into_note()).collect() } async fn save(&self, note: &Note) -> DomainResult<()> { @@ -229,26 +265,34 @@ impl NoteRepository for SqliteNoteRepository { async fn search(&self, user_id: Uuid, query: &str) -> DomainResult> { let user_id_str = user_id.to_string(); - let like_query = format!("%{}%", query); - // Use FTS5 for full-text search OR tag name match - let rows: Vec = sqlx::query_as( + // Use FTS5 for full-text search OR tag name match, with JSON-aggregated tags + let rows: Vec = sqlx::query_as( r#" - SELECT DISTINCT n.id, n.user_id, n.title, n.content, n.color, n.is_pinned, n.is_archived, n.created_at, n.updated_at + SELECT n.id, n.user_id, n.title, n.content, n.color, n.is_pinned, n.is_archived, + n.created_at, n.updated_at, + json_group_array( + CASE WHEN t.id IS NOT NULL + THEN json_object('id', t.id, 'name', t.name, 'user_id', t.user_id) + ELSE NULL END + ) as tags_json FROM notes n + LEFT JOIN note_tags nt ON n.id = nt.note_id + LEFT JOIN tags t ON nt.tag_id = t.id WHERE n.user_id = ? AND ( n.rowid IN (SELECT rowid FROM notes_fts WHERE notes_fts MATCH ?) OR EXISTS ( - SELECT 1 FROM note_tags nt - JOIN tags t ON nt.tag_id = t.id - WHERE nt.note_id = n.id AND t.name LIKE ? + SELECT 1 FROM note_tags nt2 + JOIN tags t2 ON nt2.tag_id = t2.id + WHERE nt2.note_id = n.id AND t2.name LIKE ? ) ) + GROUP BY n.id ORDER BY n.updated_at DESC - "# + "#, ) .bind(&user_id_str) .bind(query) @@ -257,15 +301,7 @@ impl NoteRepository for SqliteNoteRepository { .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - let mut notes = Vec::with_capacity(rows.len()); - for row in rows { - let note_id = Uuid::parse_str(&row.id) - .map_err(|e| DomainError::RepositoryError(format!("Invalid UUID: {}", e)))?; - let tags = self.tag_repo.find_by_note(note_id).await?; - notes.push(row.try_into_note(tags)?); - } - - Ok(notes) + rows.into_iter().map(|row| row.try_into_note()).collect() } async fn save_version(&self, version: &NoteVersion) -> DomainResult<()> {