@@ -8,6 +8,7 @@ use uuid::Uuid;
|
||||
|
||||
use crate::entities::{MAX_TAGS_PER_NOTE, Note, NoteFilter, NoteVersion, Tag, User};
|
||||
use crate::errors::{DomainError, DomainResult};
|
||||
use crate::ports::MessageBroker;
|
||||
use crate::repositories::{NoteRepository, TagRepository, UserRepository};
|
||||
|
||||
/// Request to create a new note
|
||||
@@ -38,6 +39,7 @@ pub struct UpdateNoteRequest {
|
||||
pub struct NoteService {
|
||||
note_repo: Arc<dyn NoteRepository>,
|
||||
tag_repo: Arc<dyn TagRepository>,
|
||||
message_broker: Option<Arc<dyn MessageBroker>>,
|
||||
}
|
||||
|
||||
impl NoteService {
|
||||
@@ -45,6 +47,24 @@ impl NoteService {
|
||||
Self {
|
||||
note_repo,
|
||||
tag_repo,
|
||||
message_broker: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder method to set the message broker
|
||||
pub fn with_message_broker(mut self, broker: Arc<dyn MessageBroker>) -> Self {
|
||||
self.message_broker = Some(broker);
|
||||
self
|
||||
}
|
||||
|
||||
/// Helper to publish note update events
|
||||
async fn publish_note_event(&self, note: &Note) {
|
||||
if let Some(ref broker) = self.message_broker {
|
||||
if let Err(e) = broker.publish_note_updated(note).await {
|
||||
tracing::error!(note_id = %note.id, "Failed to publish note event: {}", e);
|
||||
} else {
|
||||
tracing::info!(note_id = %note.id, "Published note.updated event");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,6 +101,9 @@ impl NoteService {
|
||||
self.tag_repo.add_to_note(tag.id, note.id).await?;
|
||||
}
|
||||
|
||||
// Publish event for smart features processing
|
||||
self.publish_note_event(¬e).await;
|
||||
|
||||
Ok(note)
|
||||
}
|
||||
|
||||
@@ -149,6 +172,10 @@ impl NoteService {
|
||||
}
|
||||
|
||||
self.note_repo.save(¬e).await?;
|
||||
|
||||
// Publish event for smart features processing
|
||||
self.publish_note_event(¬e).await;
|
||||
|
||||
Ok(note)
|
||||
}
|
||||
|
||||
@@ -217,14 +244,31 @@ impl NoteService {
|
||||
}
|
||||
|
||||
/// Get or create a tag by name
|
||||
///
|
||||
/// Handles race conditions gracefully: if a concurrent request creates
|
||||
/// the same tag, we catch the unique constraint violation and retry the lookup.
|
||||
async fn get_or_create_tag(&self, user_id: Uuid, name: &str) -> DomainResult<Tag> {
|
||||
let name = name.trim().to_lowercase();
|
||||
|
||||
// First, try to find existing tag
|
||||
if let Some(tag) = self.tag_repo.find_by_name(user_id, &name).await? {
|
||||
Ok(tag)
|
||||
} else {
|
||||
let tag = Tag::new(name, user_id);
|
||||
self.tag_repo.save(&tag).await?;
|
||||
Ok(tag)
|
||||
return Ok(tag);
|
||||
}
|
||||
|
||||
// Tag doesn't exist, try to create it
|
||||
let tag = Tag::new(name.clone(), user_id);
|
||||
match self.tag_repo.save(&tag).await {
|
||||
Ok(()) => Ok(tag),
|
||||
Err(DomainError::RepositoryError(ref e)) if e.contains("UNIQUE constraint") => {
|
||||
// Race condition: another request created the tag between our check and save
|
||||
// Retry the lookup
|
||||
tracing::debug!(tag_name = %name, "Tag creation race condition detected, retrying lookup");
|
||||
self.tag_repo
|
||||
.find_by_name(user_id, &name)
|
||||
.await?
|
||||
.ok_or_else(|| DomainError::validation("Tag creation race condition"))
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user