This commit is contained in:
2026-01-02 05:27:44 +01:00
commit 8dfb3c6922
30 changed files with 7394 additions and 0 deletions

37
template-infra/Cargo.toml Normal file
View File

@@ -0,0 +1,37 @@
[package]
name = "template-infra"
version = "0.1.0"
edition = "2024"
[features]
default = ["sqlite", "smart-features", "broker-nats"]
sqlite = ["sqlx/sqlite", "tower-sessions-sqlx-store/sqlite"]
postgres = [
"sqlx/postgres",
"tower-sessions-sqlx-store/postgres",
"k-core/postgres",
]
smart-features = ["dep:fastembed", "dep:qdrant-client"]
broker-nats = ["dep:async-nats", "dep:futures-util"]
[dependencies]
k-core = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-core", features = [
"db-sqlx",
] }
template-domain = { path = "../template-domain" }
async-trait = "0.1.89"
chrono = { version = "0.4.42", features = ["serde"] }
sqlx = { version = "0.8.6", features = ["runtime-tokio", "chrono", "migrate"] }
thiserror = "2.0.17"
tokio = { version = "1.48.0", features = ["full"] }
tracing = "0.1"
uuid = { version = "1.19.0", features = ["v4", "serde"] }
tower-sessions = "0.14.0"
tower-sessions-sqlx-store = { version = "0.15.0", default-features = false }
fastembed = { version = "5.4", optional = true }
qdrant-client = { version = "1.16", optional = true }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
async-nats = { version = "0.45", optional = true }
futures-util = { version = "0.3", optional = true }
futures-core = "0.3"

126
template-infra/src/db.rs Normal file
View File

@@ -0,0 +1,126 @@
//! Database connection pool management
use sqlx::Pool;
#[cfg(feature = "postgres")]
use sqlx::Postgres;
#[cfg(feature = "sqlite")]
use sqlx::Sqlite;
#[cfg(feature = "sqlite")]
use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
#[cfg(feature = "sqlite")]
use std::str::FromStr;
use std::time::Duration;
/// Configuration for the database connection
#[derive(Debug, Clone)]
pub struct DatabaseConfig {
pub url: String,
pub max_connections: u32,
pub min_connections: u32,
pub acquire_timeout: Duration,
}
impl Default for DatabaseConfig {
fn default() -> Self {
Self {
url: "sqlite:data.db?mode=rwc".to_string(),
max_connections: 5,
min_connections: 1,
acquire_timeout: Duration::from_secs(5),
}
}
}
impl DatabaseConfig {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
..Default::default()
}
}
pub fn in_memory() -> Self {
Self {
url: "sqlite::memory:".to_string(),
max_connections: 1, // SQLite in-memory is single-connection
min_connections: 1,
..Default::default()
}
}
}
#[derive(Clone, Debug)]
pub enum DatabasePool {
#[cfg(feature = "sqlite")]
Sqlite(Pool<Sqlite>),
#[cfg(feature = "postgres")]
Postgres(Pool<Postgres>),
}
/// Create a database connection pool
#[cfg(feature = "sqlite")]
pub async fn create_pool(config: &DatabaseConfig) -> Result<SqlitePool, sqlx::Error> {
let options = SqliteConnectOptions::from_str(&config.url)?
.create_if_missing(true)
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
.busy_timeout(Duration::from_secs(30));
let pool = SqlitePoolOptions::new()
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.acquire_timeout(config.acquire_timeout)
.connect_with(options)
.await?;
Ok(pool)
}
/// Run database migrations
pub async fn run_migrations(pool: &DatabasePool) -> Result<(), sqlx::Error> {
match pool {
#[cfg(feature = "sqlite")]
DatabasePool::Sqlite(pool) => {
sqlx::migrate!("../migrations").run(pool).await?;
}
#[cfg(feature = "postgres")]
DatabasePool::Postgres(_pool) => {
// Placeholder for Postgres migrations
// sqlx::migrate!("../migrations/postgres").run(_pool).await?;
tracing::warn!("Postgres migrations not yet implemented");
return Err(sqlx::Error::Configuration(
"Postgres migrations not yet implemented".into(),
));
}
#[allow(unreachable_patterns)]
_ => {
return Err(sqlx::Error::Configuration(
"No database feature enabled".into(),
));
}
}
tracing::info!("Database migrations completed successfully");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_create_in_memory_pool() {
let config = DatabaseConfig::in_memory();
let pool = create_pool(&config).await;
assert!(pool.is_ok());
}
#[tokio::test]
async fn test_run_migrations() {
let config = DatabaseConfig::in_memory();
let pool = create_pool(&config).await.unwrap();
let db_pool = DatabasePool::Sqlite(pool);
let result = run_migrations(&db_pool).await;
assert!(result.is_ok());
}
}

View File

@@ -0,0 +1,52 @@
use std::sync::Arc;
use crate::db::DatabasePool;
#[cfg(feature = "sqlite")]
use crate::SqliteUserRepository;
use template_domain::UserRepository;
#[derive(Debug, thiserror::Error)]
pub enum FactoryError {
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Not implemented: {0}")]
NotImplemented(String),
#[error("Infrastructure error: {0}")]
Infrastructure(#[from] template_domain::DomainError),
}
pub type FactoryResult<T> = Result<T, FactoryError>;
pub async fn build_user_repository(pool: &DatabasePool) -> FactoryResult<Arc<dyn UserRepository>> {
match pool {
#[cfg(feature = "sqlite")]
DatabasePool::Sqlite(pool) => Ok(Arc::new(SqliteUserRepository::new(pool.clone()))),
#[cfg(feature = "postgres")]
DatabasePool::Postgres(pool) => Ok(Arc::new(crate::user_repository::PostgresUserRepository::new(pool.clone()))),
#[allow(unreachable_patterns)]
_ => Err(FactoryError::NotImplemented(
"No database feature enabled".to_string(),
)),
}
}
pub async fn build_session_store(
pool: &DatabasePool,
) -> FactoryResult<crate::session_store::InfraSessionStore> {
match pool {
#[cfg(feature = "sqlite")]
DatabasePool::Sqlite(pool) => {
let store = tower_sessions_sqlx_store::SqliteStore::new(pool.clone());
Ok(crate::session_store::InfraSessionStore::Sqlite(store))
}
#[cfg(feature = "postgres")]
DatabasePool::Postgres(pool) => {
let store = tower_sessions_sqlx_store::PostgresStore::new(pool.clone());
Ok(crate::session_store::InfraSessionStore::Postgres(store))
}
#[allow(unreachable_patterns)]
_ => Err(FactoryError::NotImplemented(
"No database feature enabled".to_string(),
)),
}
}

25
template-infra/src/lib.rs Normal file
View File

@@ -0,0 +1,25 @@
//! K-Notes Infrastructure Layer
//!
//! This crate provides concrete implementations (adapters) for the
//! repository ports defined in the domain layer.
//!
//! ## Adapters
//!
//! - [`SqliteNoteRepository`] - SQLite adapter for notes with FTS5 search
//! - [`SqliteUserRepository`] - SQLite adapter for users (OIDC-ready)
//! - [`SqliteTagRepository`] - SQLite adapter for tags
//!
//! ## Database
//!
//! - [`db::create_pool`] - Create a database connection pool
//! - [`db::run_migrations`] - Run database migrations
pub mod db;
pub mod factory;
pub mod session_store;
mod user_repository;
// Re-export for convenience
pub use db::{DatabaseConfig, run_migrations};
#[cfg(feature = "sqlite")]
pub use user_repository::SqliteUserRepository;

View File

@@ -0,0 +1,73 @@
use async_trait::async_trait;
use sqlx;
use tower_sessions::{
SessionStore,
session::{Id, Record},
};
#[cfg(feature = "postgres")]
use tower_sessions_sqlx_store::PostgresStore;
#[cfg(feature = "sqlite")]
use tower_sessions_sqlx_store::SqliteStore;
#[derive(Clone, Debug)]
pub enum InfraSessionStore {
#[cfg(feature = "sqlite")]
Sqlite(SqliteStore),
#[cfg(feature = "postgres")]
Postgres(PostgresStore),
}
#[async_trait]
impl SessionStore for InfraSessionStore {
async fn save(&self, session_record: &Record) -> tower_sessions::session_store::Result<()> {
match self {
#[cfg(feature = "sqlite")]
Self::Sqlite(store) => store.save(session_record).await,
#[cfg(feature = "postgres")]
Self::Postgres(store) => store.save(session_record).await,
#[allow(unreachable_patterns)]
_ => Err(tower_sessions::session_store::Error::Backend(
"No backend enabled".to_string(),
)),
}
}
async fn load(&self, session_id: &Id) -> tower_sessions::session_store::Result<Option<Record>> {
match self {
#[cfg(feature = "sqlite")]
Self::Sqlite(store) => store.load(session_id).await,
#[cfg(feature = "postgres")]
Self::Postgres(store) => store.load(session_id).await,
#[allow(unreachable_patterns)]
_ => Err(tower_sessions::session_store::Error::Backend(
"No backend enabled".to_string(),
)),
}
}
async fn delete(&self, session_id: &Id) -> tower_sessions::session_store::Result<()> {
match self {
#[cfg(feature = "sqlite")]
Self::Sqlite(store) => store.delete(session_id).await,
#[cfg(feature = "postgres")]
Self::Postgres(store) => store.delete(session_id).await,
#[allow(unreachable_patterns)]
_ => Err(tower_sessions::session_store::Error::Backend(
"No backend enabled".to_string(),
)),
}
}
}
impl InfraSessionStore {
pub async fn migrate(&self) -> Result<(), sqlx::Error> {
match self {
#[cfg(feature = "sqlite")]
Self::Sqlite(store) => store.migrate().await,
#[cfg(feature = "postgres")]
Self::Postgres(store) => store.migrate().await,
#[allow(unreachable_patterns)]
_ => Err(sqlx::Error::Configuration("No backend enabled".into())),
}
}
}

View File

@@ -0,0 +1,306 @@
//! SQLite implementation of UserRepository
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use sqlx::{FromRow, SqlitePool};
use uuid::Uuid;
use template_domain::{DomainError, DomainResult, Email, User, UserRepository};
/// SQLite adapter for UserRepository
#[cfg(feature = "sqlite")]
#[derive(Clone)]
pub struct SqliteUserRepository {
pool: SqlitePool,
}
#[cfg(feature = "sqlite")]
impl SqliteUserRepository {
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
}
/// Row type for SQLite query results
#[derive(Debug, FromRow)]
struct UserRow {
id: String,
subject: String,
email: String,
password_hash: Option<String>,
created_at: String,
}
impl TryFrom<UserRow> for User {
type Error = DomainError;
fn try_from(row: UserRow) -> Result<Self, Self::Error> {
let id = Uuid::parse_str(&row.id)
.map_err(|e| DomainError::RepositoryError(format!("Invalid UUID: {}", e)))?;
let created_at = DateTime::parse_from_rfc3339(&row.created_at)
.map(|dt| dt.with_timezone(&Utc))
.or_else(|_| {
// Fallback for SQLite datetime format
chrono::NaiveDateTime::parse_from_str(&row.created_at, "%Y-%m-%d %H:%M:%S")
.map(|dt| dt.and_utc())
})
.map_err(|e| DomainError::RepositoryError(format!("Invalid datetime: {}", e)))?;
// Parse email from string - it was validated when originally stored
let email = Email::try_from(row.email)
.map_err(|e| DomainError::RepositoryError(format!("Invalid email in DB: {}", e)))?;
Ok(User::with_id(
id,
row.subject,
email,
row.password_hash,
created_at,
))
}
}
#[cfg(feature = "sqlite")]
#[async_trait]
impl UserRepository for SqliteUserRepository {
async fn find_by_id(&self, id: Uuid) -> DomainResult<Option<User>> {
let id_str = id.to_string();
let row: Option<UserRow> = sqlx::query_as(
"SELECT id, subject, email, password_hash, created_at FROM users WHERE id = ?",
)
.bind(&id_str)
.fetch_optional(&self.pool)
.await
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
row.map(User::try_from).transpose()
}
async fn find_by_subject(&self, subject: &str) -> DomainResult<Option<User>> {
let row: Option<UserRow> = sqlx::query_as(
"SELECT id, subject, email, password_hash, created_at FROM users WHERE subject = ?",
)
.bind(subject)
.fetch_optional(&self.pool)
.await
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
row.map(User::try_from).transpose()
}
async fn find_by_email(&self, email: &str) -> DomainResult<Option<User>> {
let row: Option<UserRow> = sqlx::query_as(
"SELECT id, subject, email, password_hash, created_at FROM users WHERE email = ?",
)
.bind(email)
.fetch_optional(&self.pool)
.await
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
row.map(User::try_from).transpose()
}
async fn save(&self, user: &User) -> DomainResult<()> {
let id = user.id.to_string();
let created_at = user.created_at.to_rfc3339();
sqlx::query(
r#"
INSERT INTO users (id, subject, email, password_hash, created_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
subject = excluded.subject,
email = excluded.email,
password_hash = excluded.password_hash
"#,
)
.bind(&id)
.bind(&user.subject)
.bind(user.email.as_ref()) // Use .as_ref() to get the inner &str
.bind(&user.password_hash)
.bind(&created_at)
.execute(&self.pool)
.await
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
Ok(())
}
async fn delete(&self, id: Uuid) -> DomainResult<()> {
let id_str = id.to_string();
sqlx::query("DELETE FROM users WHERE id = ?")
.bind(&id_str)
.execute(&self.pool)
.await
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
Ok(())
}
}
#[cfg(all(test, feature = "sqlite"))]
mod tests {
use super::*;
use crate::db::{DatabaseConfig, DatabasePool, create_pool, run_migrations};
async fn setup_test_db() -> SqlitePool {
let config = DatabaseConfig::in_memory();
let pool = create_pool(&config).await.unwrap();
let db_pool = DatabasePool::Sqlite(pool.clone());
run_migrations(&db_pool).await.unwrap();
pool
}
#[tokio::test]
async fn test_save_and_find_user() {
let pool = setup_test_db().await;
let repo = SqliteUserRepository::new(pool);
let email = Email::try_from("test@example.com").unwrap();
let user = User::new("oidc|123", email);
repo.save(&user).await.unwrap();
let found = repo.find_by_id(user.id).await.unwrap();
assert!(found.is_some());
let found = found.unwrap();
assert_eq!(found.subject, "oidc|123");
assert_eq!(found.email_str(), "test@example.com");
assert!(found.password_hash.is_none());
}
#[tokio::test]
async fn test_save_and_find_user_with_password() {
let pool = setup_test_db().await;
let repo = SqliteUserRepository::new(pool);
let email = Email::try_from("local@example.com").unwrap();
let user = User::new_local(email, "hashed_pw");
repo.save(&user).await.unwrap();
let found = repo.find_by_id(user.id).await.unwrap();
assert!(found.is_some());
let found = found.unwrap();
assert_eq!(found.email_str(), "local@example.com");
assert_eq!(found.password_hash, Some("hashed_pw".to_string()));
}
#[tokio::test]
async fn test_find_by_subject() {
let pool = setup_test_db().await;
let repo = SqliteUserRepository::new(pool);
let email = Email::try_from("user@gmail.com").unwrap();
let user = User::new("google|456", email);
repo.save(&user).await.unwrap();
let found = repo.find_by_subject("google|456").await.unwrap();
assert!(found.is_some());
assert_eq!(found.unwrap().id, user.id);
}
#[tokio::test]
async fn test_delete_user() {
let pool = setup_test_db().await;
let repo = SqliteUserRepository::new(pool);
let email = Email::try_from("delete@test.com").unwrap();
let user = User::new("test|789", email);
repo.save(&user).await.unwrap();
repo.delete(user.id).await.unwrap();
let found = repo.find_by_id(user.id).await.unwrap();
assert!(found.is_none());
}
}
/// PostgreSQL adapter for UserRepository
#[cfg(feature = "postgres")]
#[derive(Clone)]
pub struct PostgresUserRepository {
pool: sqlx::Pool<sqlx::Postgres>,
}
#[cfg(feature = "postgres")]
impl PostgresUserRepository {
pub fn new(pool: sqlx::Pool<sqlx::Postgres>) -> Self {
Self { pool }
}
}
#[cfg(feature = "postgres")]
#[async_trait]
impl UserRepository for PostgresUserRepository {
async fn find_by_id(&self, id: Uuid) -> DomainResult<Option<User>> {
let id_str = id.to_string();
let row: Option<UserRow> = sqlx::query_as(
"SELECT id, subject, email, password_hash, created_at FROM users WHERE id = $1",
)
.bind(&id_str)
.fetch_optional(&self.pool)
.await
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
row.map(User::try_from).transpose()
}
async fn find_by_subject(&self, subject: &str) -> DomainResult<Option<User>> {
let row: Option<UserRow> = sqlx::query_as(
"SELECT id, subject, email, password_hash, created_at FROM users WHERE subject = $1",
)
.bind(subject)
.fetch_optional(&self.pool)
.await
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
row.map(User::try_from).transpose()
}
async fn find_by_email(&self, email: &str) -> DomainResult<Option<User>> {
let row: Option<UserRow> = sqlx::query_as(
"SELECT id, subject, email, password_hash, created_at FROM users WHERE email = $1",
)
.bind(email)
.fetch_optional(&self.pool)
.await
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
row.map(User::try_from).transpose()
}
async fn save(&self, user: &User) -> DomainResult<()> {
let id = user.id.to_string();
let created_at = user.created_at.to_rfc3339();
sqlx::query(
r#"
INSERT INTO users (id, subject, email, password_hash, created_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT(id) DO UPDATE SET
subject = excluded.subject,
email = excluded.email,
password_hash = excluded.password_hash
"#,
)
.bind(&id)
.bind(&user.subject)
.bind(user.email.as_ref())
.bind(&user.password_hash)
.bind(&created_at)
.execute(&self.pool)
.await
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
Ok(())
}
async fn delete(&self, id: Uuid) -> DomainResult<()> {
let id_str = id.to_string();
sqlx::query("DELETE FROM users WHERE id = $1")
.bind(&id_str)
.execute(&self.pool)
.await
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
Ok(())
}
}