Newtypes and broker refactor
Reviewed-on: #10
This commit was merged in pull request #10.
This commit is contained in:
@@ -4,10 +4,11 @@ version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[features]
|
||||
default = ["sqlite", "smart-features"]
|
||||
default = ["sqlite", "smart-features", "broker-nats"]
|
||||
sqlite = ["sqlx/sqlite", "tower-sessions-sqlx-store/sqlite"]
|
||||
postgres = ["sqlx/postgres", "tower-sessions-sqlx-store/postgres"]
|
||||
smart-features = ["dep:fastembed", "dep:qdrant-client"]
|
||||
broker-nats = ["dep:async-nats", "dep:futures-util"]
|
||||
|
||||
[dependencies]
|
||||
notes-domain = { path = "../notes-domain" }
|
||||
@@ -23,3 +24,7 @@ tower-sessions-sqlx-store = { version = "0.15.0", default-features = false }
|
||||
fastembed = { version = "5.4", optional = true }
|
||||
qdrant-client = { version = "1.16", optional = true }
|
||||
serde_json = "1.0"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
async-nats = { version = "0.45", optional = true }
|
||||
futures-util = { version = "0.3", optional = true }
|
||||
futures-core = "0.3"
|
||||
|
||||
7
notes-infra/src/broker/mod.rs
Normal file
7
notes-infra/src/broker/mod.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
//! Message broker adapters for various backends.
|
||||
//!
|
||||
//! This module provides implementations of the `MessageBroker` port
|
||||
//! for different messaging backends.
|
||||
|
||||
#[cfg(feature = "broker-nats")]
|
||||
pub mod nats;
|
||||
66
notes-infra/src/broker/nats.rs
Normal file
66
notes-infra/src/broker/nats.rs
Normal file
@@ -0,0 +1,66 @@
|
||||
//! NATS message broker adapter
|
||||
//!
|
||||
//! Implements the `MessageBroker` port for NATS messaging.
|
||||
|
||||
use std::pin::Pin;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures_util::StreamExt;
|
||||
use notes_domain::{DomainError, DomainResult, MessageBroker, Note};
|
||||
|
||||
/// NATS adapter implementing the MessageBroker port.
|
||||
pub struct NatsMessageBroker {
|
||||
client: async_nats::Client,
|
||||
}
|
||||
|
||||
impl NatsMessageBroker {
|
||||
/// Create a new NATS message broker by connecting to the given URL.
|
||||
pub async fn connect(url: &str) -> Result<Self, async_nats::ConnectError> {
|
||||
let client = async_nats::connect(url).await?;
|
||||
Ok(Self { client })
|
||||
}
|
||||
|
||||
/// Create a NATS message broker from an existing client.
|
||||
pub fn from_client(client: async_nats::Client) -> Self {
|
||||
Self { client }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl MessageBroker for NatsMessageBroker {
|
||||
async fn publish_note_updated(&self, note: &Note) -> DomainResult<()> {
|
||||
let payload = serde_json::to_vec(note).map_err(|e| {
|
||||
DomainError::RepositoryError(format!("Failed to serialize note: {}", e))
|
||||
})?;
|
||||
|
||||
self.client
|
||||
.publish("notes.updated", payload.into())
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Failed to publish event: {}", e)))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn subscribe_note_updates(
|
||||
&self,
|
||||
) -> DomainResult<Pin<Box<dyn futures_core::Stream<Item = Note> + Send>>> {
|
||||
let subscriber = self
|
||||
.client
|
||||
.subscribe("notes.updated")
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Failed to subscribe: {}", e)))?;
|
||||
|
||||
// Transform the NATS message stream into a Note stream
|
||||
let note_stream = subscriber.filter_map(|msg| async move {
|
||||
match serde_json::from_slice::<Note>(&msg.payload) {
|
||||
Ok(note) => Some(note),
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to deserialize note from message: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Box::pin(note_stream))
|
||||
}
|
||||
}
|
||||
@@ -57,6 +57,37 @@ pub async fn build_vector_store(
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration for message broker providers.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum BrokerProvider {
|
||||
/// NATS message broker (requires `broker-nats` feature).
|
||||
#[cfg(feature = "broker-nats")]
|
||||
Nats { url: String },
|
||||
/// No message broker (messaging disabled).
|
||||
None,
|
||||
}
|
||||
|
||||
/// Build a message broker based on the provider configuration.
|
||||
/// Returns `None` if `BrokerProvider::None` is specified.
|
||||
pub async fn build_message_broker(
|
||||
provider: &BrokerProvider,
|
||||
) -> FactoryResult<Option<Arc<dyn notes_domain::MessageBroker>>> {
|
||||
match provider {
|
||||
#[cfg(feature = "broker-nats")]
|
||||
BrokerProvider::Nats { url } => {
|
||||
let broker = crate::broker::nats::NatsMessageBroker::connect(url)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
FactoryError::Infrastructure(notes_domain::DomainError::RepositoryError(
|
||||
format!("NATS connection failed: {}", e),
|
||||
))
|
||||
})?;
|
||||
Ok(Some(Arc::new(broker)))
|
||||
}
|
||||
BrokerProvider::None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
pub async fn build_link_repository(
|
||||
pool: &DatabasePool,
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
//! - [`db::create_pool`] - Create a database connection pool
|
||||
//! - [`db::run_migrations`] - Run database migrations
|
||||
|
||||
#[cfg(feature = "broker-nats")]
|
||||
pub mod broker;
|
||||
pub mod db;
|
||||
#[cfg(feature = "smart-features")]
|
||||
pub mod embeddings;
|
||||
|
||||
@@ -5,7 +5,10 @@ use chrono::{DateTime, Utc};
|
||||
use sqlx::{FromRow, QueryBuilder, Sqlite, SqlitePool};
|
||||
use uuid::Uuid;
|
||||
|
||||
use notes_domain::{DomainError, DomainResult, Note, NoteFilter, NoteRepository, NoteVersion, Tag};
|
||||
use notes_domain::{
|
||||
DomainError, DomainResult, Note, NoteFilter, NoteRepository, NoteTitle, NoteVersion, Tag,
|
||||
TagName,
|
||||
};
|
||||
|
||||
/// SQLite adapter for NoteRepository
|
||||
pub struct SqliteNoteRepository {
|
||||
@@ -23,7 +26,7 @@ impl SqliteNoteRepository {
|
||||
struct NoteRowWithTags {
|
||||
id: String,
|
||||
user_id: String,
|
||||
title: String,
|
||||
title: Option<String>, // Title can be NULL in the database
|
||||
content: String,
|
||||
color: String,
|
||||
is_pinned: i32,
|
||||
@@ -68,7 +71,12 @@ fn parse_tags_json(tags_json: &str) -> Result<Vec<Tag>, DomainError> {
|
||||
let user_id = Uuid::parse_str(user_id_str)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid tag user_id: {}", e)))?;
|
||||
|
||||
Ok(Tag::with_id(id, name.to_string(), user_id))
|
||||
// Parse TagName from stored string
|
||||
let tag_name = TagName::try_from(name.to_string()).map_err(|e| {
|
||||
DomainError::RepositoryError(format!("Invalid tag name in DB: {}", e))
|
||||
})?;
|
||||
|
||||
Ok(Tag::with_id(id, tag_name, user_id))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
@@ -84,10 +92,18 @@ impl NoteRowWithTags {
|
||||
let updated_at = parse_datetime(&self.updated_at)?;
|
||||
let tags = parse_tags_json(&self.tags_json)?;
|
||||
|
||||
// Parse optional title - empty string or NULL maps to None
|
||||
let title: Option<NoteTitle> = match self.title {
|
||||
Some(t) if !t.trim().is_empty() => Some(NoteTitle::try_from(t).map_err(|e| {
|
||||
DomainError::RepositoryError(format!("Invalid title in DB: {}", e))
|
||||
})?),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
Ok(Note {
|
||||
id,
|
||||
user_id,
|
||||
title: self.title,
|
||||
title,
|
||||
content: self.content,
|
||||
color: self.color,
|
||||
is_pinned: self.is_pinned != 0,
|
||||
@@ -103,7 +119,7 @@ impl NoteRowWithTags {
|
||||
struct NoteVersionRow {
|
||||
id: String,
|
||||
note_id: String,
|
||||
title: String,
|
||||
title: Option<String>, // Title can be NULL
|
||||
content: String,
|
||||
created_at: String,
|
||||
}
|
||||
@@ -126,7 +142,7 @@ impl NoteVersionRow {
|
||||
Ok(NoteVersion {
|
||||
id,
|
||||
note_id,
|
||||
title: self.title,
|
||||
title: self.title, // Already Option<String>
|
||||
content: self.content,
|
||||
created_at,
|
||||
})
|
||||
@@ -222,6 +238,8 @@ impl NoteRepository for SqliteNoteRepository {
|
||||
let is_archived: i32 = if note.is_archived { 1 } else { 0 };
|
||||
let created_at = note.created_at.to_rfc3339();
|
||||
let updated_at = note.updated_at.to_rfc3339();
|
||||
// Convert Option<NoteTitle> to Option<&str> for binding
|
||||
let title_str: Option<&str> = note.title.as_ref().map(|t| t.as_ref());
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
@@ -238,7 +256,7 @@ impl NoteRepository for SqliteNoteRepository {
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(&user_id)
|
||||
.bind(¬e.title)
|
||||
.bind(title_str)
|
||||
.bind(¬e.content)
|
||||
.bind(¬e.color)
|
||||
.bind(is_pinned)
|
||||
|
||||
@@ -4,7 +4,7 @@ use async_trait::async_trait;
|
||||
use sqlx::{FromRow, SqlitePool};
|
||||
use uuid::Uuid;
|
||||
|
||||
use notes_domain::{DomainError, DomainResult, Tag, TagRepository};
|
||||
use notes_domain::{DomainError, DomainResult, Tag, TagName, TagRepository};
|
||||
|
||||
/// SQLite adapter for TagRepository
|
||||
pub struct SqliteTagRepository {
|
||||
@@ -33,7 +33,11 @@ impl TryFrom<TagRow> for Tag {
|
||||
let user_id = Uuid::parse_str(&row.user_id)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid UUID: {}", e)))?;
|
||||
|
||||
Ok(Tag::with_id(id, row.name, user_id))
|
||||
// Parse TagName from stored string - was validated when originally stored
|
||||
let name = TagName::try_from(row.name)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid tag name in DB: {}", e)))?;
|
||||
|
||||
Ok(Tag::with_id(id, name, user_id))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,7 +91,7 @@ impl TagRepository for SqliteTagRepository {
|
||||
"#,
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(&tag.name)
|
||||
.bind(tag.name.as_ref()) // Use .as_ref() to get the inner &str
|
||||
.bind(&user_id)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
@@ -160,7 +164,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::db::{DatabaseConfig, DatabasePool, create_pool, run_migrations};
|
||||
use crate::user_repository::SqliteUserRepository;
|
||||
use notes_domain::{User, UserRepository};
|
||||
use notes_domain::{Email, User, UserRepository};
|
||||
|
||||
async fn setup_test_db() -> SqlitePool {
|
||||
let config = DatabaseConfig::in_memory();
|
||||
@@ -172,7 +176,8 @@ mod tests {
|
||||
|
||||
async fn create_test_user(pool: &SqlitePool) -> User {
|
||||
let user_repo = SqliteUserRepository::new(pool.clone());
|
||||
let user = User::new("test|user", "test@example.com");
|
||||
let email = Email::try_from("test@example.com").unwrap();
|
||||
let user = User::new("test|user", email);
|
||||
user_repo.save(&user).await.unwrap();
|
||||
user
|
||||
}
|
||||
@@ -183,12 +188,13 @@ mod tests {
|
||||
let user = create_test_user(&pool).await;
|
||||
let repo = SqliteTagRepository::new(pool);
|
||||
|
||||
let tag = Tag::new("work", user.id);
|
||||
let name = TagName::try_from("work").unwrap();
|
||||
let tag = Tag::new(name, user.id);
|
||||
repo.save(&tag).await.unwrap();
|
||||
|
||||
let found = repo.find_by_id(tag.id).await.unwrap();
|
||||
assert!(found.is_some());
|
||||
assert_eq!(found.unwrap().name, "work");
|
||||
assert_eq!(found.unwrap().name_str(), "work");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -197,7 +203,8 @@ mod tests {
|
||||
let user = create_test_user(&pool).await;
|
||||
let repo = SqliteTagRepository::new(pool);
|
||||
|
||||
let tag = Tag::new("important", user.id);
|
||||
let name = TagName::try_from("important").unwrap();
|
||||
let tag = Tag::new(name, user.id);
|
||||
repo.save(&tag).await.unwrap();
|
||||
|
||||
let found = repo.find_by_name(user.id, "important").await.unwrap();
|
||||
@@ -211,13 +218,15 @@ mod tests {
|
||||
let user = create_test_user(&pool).await;
|
||||
let repo = SqliteTagRepository::new(pool);
|
||||
|
||||
repo.save(&Tag::new("alpha", user.id)).await.unwrap();
|
||||
repo.save(&Tag::new("beta", user.id)).await.unwrap();
|
||||
let name_alpha = TagName::try_from("alpha").unwrap();
|
||||
let name_beta = TagName::try_from("beta").unwrap();
|
||||
repo.save(&Tag::new(name_alpha, user.id)).await.unwrap();
|
||||
repo.save(&Tag::new(name_beta, user.id)).await.unwrap();
|
||||
|
||||
let tags = repo.find_by_user(user.id).await.unwrap();
|
||||
assert_eq!(tags.len(), 2);
|
||||
// Should be sorted alphabetically
|
||||
assert_eq!(tags[0].name, "alpha");
|
||||
assert_eq!(tags[1].name, "beta");
|
||||
assert_eq!(tags[0].name_str(), "alpha");
|
||||
assert_eq!(tags[1].name_str(), "beta");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use chrono::{DateTime, Utc};
|
||||
use sqlx::{FromRow, SqlitePool};
|
||||
use uuid::Uuid;
|
||||
|
||||
use notes_domain::{DomainError, DomainResult, User, UserRepository};
|
||||
use notes_domain::{DomainError, DomainResult, Email, User, UserRepository};
|
||||
|
||||
/// SQLite adapter for UserRepository
|
||||
pub struct SqliteUserRepository {
|
||||
@@ -43,10 +43,14 @@ impl TryFrom<UserRow> for User {
|
||||
})
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid datetime: {}", e)))?;
|
||||
|
||||
// Parse email from string - it was validated when originally stored
|
||||
let email = Email::try_from(row.email)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid email in DB: {}", e)))?;
|
||||
|
||||
Ok(User::with_id(
|
||||
id,
|
||||
row.subject,
|
||||
row.email,
|
||||
email,
|
||||
row.password_hash,
|
||||
created_at,
|
||||
))
|
||||
@@ -108,7 +112,7 @@ impl UserRepository for SqliteUserRepository {
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(&user.subject)
|
||||
.bind(&user.email)
|
||||
.bind(user.email.as_ref()) // Use .as_ref() to get the inner &str
|
||||
.bind(&user.password_hash)
|
||||
.bind(&created_at)
|
||||
.execute(&self.pool)
|
||||
@@ -148,14 +152,15 @@ mod tests {
|
||||
let pool = setup_test_db().await;
|
||||
let repo = SqliteUserRepository::new(pool);
|
||||
|
||||
let user = User::new("oidc|123", "test@example.com");
|
||||
let email = Email::try_from("test@example.com").unwrap();
|
||||
let user = User::new("oidc|123", email);
|
||||
repo.save(&user).await.unwrap();
|
||||
|
||||
let found = repo.find_by_id(user.id).await.unwrap();
|
||||
assert!(found.is_some());
|
||||
let found = found.unwrap();
|
||||
assert_eq!(found.subject, "oidc|123");
|
||||
assert_eq!(found.email, "test@example.com");
|
||||
assert_eq!(found.email_str(), "test@example.com");
|
||||
assert!(found.password_hash.is_none());
|
||||
}
|
||||
|
||||
@@ -164,13 +169,14 @@ mod tests {
|
||||
let pool = setup_test_db().await;
|
||||
let repo = SqliteUserRepository::new(pool);
|
||||
|
||||
let user = User::new_local("local@example.com", "hashed_pw");
|
||||
let email = Email::try_from("local@example.com").unwrap();
|
||||
let user = User::new_local(email, "hashed_pw");
|
||||
repo.save(&user).await.unwrap();
|
||||
|
||||
let found = repo.find_by_id(user.id).await.unwrap();
|
||||
assert!(found.is_some());
|
||||
let found = found.unwrap();
|
||||
assert_eq!(found.email, "local@example.com");
|
||||
assert_eq!(found.email_str(), "local@example.com");
|
||||
assert_eq!(found.password_hash, Some("hashed_pw".to_string()));
|
||||
}
|
||||
|
||||
@@ -179,7 +185,8 @@ mod tests {
|
||||
let pool = setup_test_db().await;
|
||||
let repo = SqliteUserRepository::new(pool);
|
||||
|
||||
let user = User::new("google|456", "user@gmail.com");
|
||||
let email = Email::try_from("user@gmail.com").unwrap();
|
||||
let user = User::new("google|456", email);
|
||||
repo.save(&user).await.unwrap();
|
||||
|
||||
let found = repo.find_by_subject("google|456").await.unwrap();
|
||||
@@ -192,7 +199,8 @@ mod tests {
|
||||
let pool = setup_test_db().await;
|
||||
let repo = SqliteUserRepository::new(pool);
|
||||
|
||||
let user = User::new("test|789", "delete@test.com");
|
||||
let email = Email::try_from("delete@test.com").unwrap();
|
||||
let user = User::new("test|789", email);
|
||||
repo.save(&user).await.unwrap();
|
||||
repo.delete(user.id).await.unwrap();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user