From f4fd915e3564059c0827d673188c5e8fcc4abf2a Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Tue, 9 Jun 2026 00:19:02 +0200 Subject: [PATCH] refactor: replace sqlx compile-time macros with runtime queries No longer requires DATABASE_URL at build time. --- crates/adapters/sqlite/src/import_profile.rs | 58 ++++--- crates/adapters/sqlite/src/import_session.rs | 55 +++--- crates/adapters/sqlite/src/lib.rs | 169 +++++++++---------- crates/adapters/sqlite/src/profile_fields.rs | 34 ++-- 4 files changed, 171 insertions(+), 145 deletions(-) diff --git a/crates/adapters/sqlite/src/import_profile.rs b/crates/adapters/sqlite/src/import_profile.rs index ee791ca..7d3bd23 100644 --- a/crates/adapters/sqlite/src/import_profile.rs +++ b/crates/adapters/sqlite/src/import_profile.rs @@ -118,15 +118,15 @@ impl ImportProfileRepository for SqliteImportProfileRepository { let user_id = p.user_id.value().to_string(); let created_at = p.created_at.format("%Y-%m-%d %H:%M:%S").to_string(); let field_mappings = serialize_mappings(&p.field_mappings)?; - sqlx::query!( + sqlx::query( "INSERT OR REPLACE INTO import_profiles (id, user_id, name, field_mappings, created_at) VALUES (?, ?, ?, ?, ?)", - id, - user_id, - p.name, - field_mappings, - created_at ) + .bind(&id) + .bind(&user_id) + .bind(&p.name) + .bind(&field_mappings) + .bind(&created_at) .execute(&self.pool) .await .map(|_| ()) @@ -135,29 +135,33 @@ impl ImportProfileRepository for SqliteImportProfileRepository { async fn list_for_user(&self, user_id: &UserId) -> Result, DomainError> { let uid = user_id.value().to_string(); - let rows = sqlx::query!( + let rows = sqlx::query( "SELECT id, user_id, name, field_mappings, created_at FROM import_profiles WHERE user_id = ? ORDER BY created_at DESC", - uid ) + .bind(&uid) .fetch_all(&self.pool) .await .map_err(Self::map_err)?; - rows.into_iter() + rows.iter() .map(|r| { + use sqlx::Row; + let id_str: String = r.get("id"); + let uid_str: String = r.get("user_id"); + let fm: String = r.get("field_mappings"); + let ca: String = r.get("created_at"); Ok(ImportProfile { id: ImportProfileId::from_uuid( - r.id.parse::() + id_str.parse::() .map_err(|e| DomainError::InfrastructureError(e.to_string()))?, ), user_id: UserId::from_uuid( - r.user_id - .parse::() + uid_str.parse::() .map_err(|e| DomainError::InfrastructureError(e.to_string()))?, ), - name: r.name, - field_mappings: deserialize_mappings(&r.field_mappings)?, - created_at: Self::parse_dt(&r.created_at)?, + name: r.get("name"), + field_mappings: deserialize_mappings(&fm)?, + created_at: Self::parse_dt(&ca)?, }) }) .collect() @@ -170,28 +174,33 @@ impl ImportProfileRepository for SqliteImportProfileRepository { ) -> Result, DomainError> { let id_str = id.value().to_string(); let uid_str = user_id.value().to_string(); - let row = sqlx::query!( + let row = sqlx::query( "SELECT id, user_id, name, field_mappings, created_at FROM import_profiles WHERE id = ? AND user_id = ?", - id_str, uid_str ) + .bind(&id_str) + .bind(&uid_str) .fetch_optional(&self.pool) .await .map_err(Self::map_err)?; row.map(|r| { + use sqlx::Row; + let rid: String = r.get("id"); + let ruid: String = r.get("user_id"); + let fm: String = r.get("field_mappings"); + let ca: String = r.get("created_at"); Ok(ImportProfile { id: ImportProfileId::from_uuid( - r.id.parse::() + rid.parse::() .map_err(|e| DomainError::InfrastructureError(e.to_string()))?, ), user_id: UserId::from_uuid( - r.user_id - .parse::() + ruid.parse::() .map_err(|e| DomainError::InfrastructureError(e.to_string()))?, ), - name: r.name, - field_mappings: deserialize_mappings(&r.field_mappings)?, - created_at: Self::parse_dt(&r.created_at)?, + name: r.get("name"), + field_mappings: deserialize_mappings(&fm)?, + created_at: Self::parse_dt(&ca)?, }) }) .transpose() @@ -199,7 +208,8 @@ impl ImportProfileRepository for SqliteImportProfileRepository { async fn delete(&self, id: &ImportProfileId) -> Result<(), DomainError> { let id_str = id.value().to_string(); - sqlx::query!("DELETE FROM import_profiles WHERE id = ?", id_str) + sqlx::query("DELETE FROM import_profiles WHERE id = ?") + .bind(&id_str) .execute(&self.pool) .await .map(|_| ()) diff --git a/crates/adapters/sqlite/src/import_session.rs b/crates/adapters/sqlite/src/import_session.rs index b6aded1..4bf5662 100644 --- a/crates/adapters/sqlite/src/import_session.rs +++ b/crates/adapters/sqlite/src/import_session.rs @@ -302,11 +302,17 @@ impl ImportSessionRepository for SqliteImportSessionRepository { let created_at = s.created_at.format("%Y-%m-%d %H:%M:%S").to_string(); let expires_at = s.expires_at.format("%Y-%m-%d %H:%M:%S").to_string(); let (parsed_data, field_mappings, row_results) = Self::serialize_session(s)?; - sqlx::query!( + sqlx::query( "INSERT INTO import_sessions (id, user_id, parsed_data, field_mappings, row_results, created_at, expires_at) VALUES (?, ?, ?, ?, ?, ?, ?)", - id, user_id, parsed_data, field_mappings, row_results, created_at, expires_at ) + .bind(&id) + .bind(&user_id) + .bind(&parsed_data) + .bind(&field_mappings) + .bind(&row_results) + .bind(&created_at) + .bind(&expires_at) .execute(&self.pool) .await .map(|_| ()) @@ -320,25 +326,26 @@ impl ImportSessionRepository for SqliteImportSessionRepository { ) -> Result, DomainError> { let id_str = id.value().to_string(); let uid_str = user_id.value().to_string(); - let row = sqlx::query!( + let row = sqlx::query( "SELECT id, user_id, parsed_data, field_mappings, row_results, created_at, expires_at FROM import_sessions WHERE id = ? AND user_id = ?", - id_str, - uid_str ) + .bind(&id_str) + .bind(&uid_str) .fetch_optional(&self.pool) .await .map_err(Self::map_err)?; row.map(|r| { + use sqlx::Row; Self::deserialize_session( - r.id, - r.user_id, - r.parsed_data, - r.field_mappings, - r.row_results, - &r.created_at, - &r.expires_at, + r.get("id"), + r.get("user_id"), + r.get("parsed_data"), + r.get("field_mappings"), + r.get("row_results"), + &r.get::("created_at"), + &r.get::("expires_at"), ) }) .transpose() @@ -347,12 +354,12 @@ impl ImportSessionRepository for SqliteImportSessionRepository { async fn update(&self, s: &ImportSession) -> Result<(), DomainError> { let id = s.id.value().to_string(); let (_, field_mappings, row_results) = Self::serialize_session(s)?; - sqlx::query!( + sqlx::query( "UPDATE import_sessions SET field_mappings = ?, row_results = ? WHERE id = ?", - field_mappings, - row_results, - id ) + .bind(&field_mappings) + .bind(&row_results) + .bind(&id) .execute(&self.pool) .await .map(|_| ()) @@ -361,7 +368,8 @@ impl ImportSessionRepository for SqliteImportSessionRepository { async fn delete(&self, id: &ImportSessionId) -> Result<(), DomainError> { let id_str = id.value().to_string(); - sqlx::query!("DELETE FROM import_sessions WHERE id = ?", id_str) + sqlx::query("DELETE FROM import_sessions WHERE id = ?") + .bind(&id_str) .execute(&self.pool) .await .map(|_| ()) @@ -369,19 +377,20 @@ impl ImportSessionRepository for SqliteImportSessionRepository { } async fn delete_expired(&self) -> Result { - let result = sqlx::query!("DELETE FROM import_sessions WHERE expires_at < datetime('now')") - .execute(&self.pool) - .await - .map_err(Self::map_err)?; + let result = + sqlx::query("DELETE FROM import_sessions WHERE expires_at < datetime('now')") + .execute(&self.pool) + .await + .map_err(Self::map_err)?; Ok(result.rows_affected()) } async fn delete_expired_for_user(&self, user_id: &UserId) -> Result<(), DomainError> { let uid = user_id.value().to_string(); - sqlx::query!( + sqlx::query( "DELETE FROM import_sessions WHERE user_id = ? AND expires_at < datetime('now')", - uid ) + .bind(&uid) .execute(&self.pool) .await .map(|_| ()) diff --git a/crates/adapters/sqlite/src/lib.rs b/crates/adapters/sqlite/src/lib.rs index bed7e0d..df675d7 100644 --- a/crates/adapters/sqlite/src/lib.rs +++ b/crates/adapters/sqlite/src/lib.rs @@ -96,14 +96,17 @@ impl SqliteMovieRepository { async fn count_diary_entries(&self, movie_id: Option<&str>) -> Result { match movie_id { - None => sqlx::query_scalar!("SELECT COUNT(*) FROM reviews") - .fetch_one(&self.pool) - .await - .map_err(Self::map_err), - Some(id) => sqlx::query_scalar!("SELECT COUNT(*) FROM reviews WHERE movie_id = ?", id) + None => sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews") .fetch_one(&self.pool) .await .map_err(Self::map_err), + Some(id) => { + sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews WHERE movie_id = ?") + .bind(id) + .fetch_one(&self.pool) + .await + .map_err(Self::map_err) + } } } @@ -234,13 +237,12 @@ impl SqliteMovieRepository { } async fn fetch_user_totals(&self, user_id: &str) -> Result { - sqlx::query_as!( - UserTotalsRow, - r#"SELECT COUNT(DISTINCT movie_id) AS "total!: i64", - AVG(CAST(rating AS REAL)) AS avg_rating - FROM reviews WHERE user_id = ?"#, - user_id + sqlx::query_as::<_, UserTotalsRow>( + "SELECT COUNT(DISTINCT movie_id) AS total, + AVG(CAST(rating AS REAL)) AS avg_rating + FROM reviews WHERE user_id = ?", ) + .bind(user_id) .fetch_one(&self.pool) .await .map_err(Self::map_err) @@ -250,7 +252,7 @@ impl SqliteMovieRepository { &self, user_id: &str, ) -> Result, DomainError> { - let row = sqlx::query_scalar!( + let row: Option = sqlx::query_scalar( "SELECT m.director FROM reviews r INNER JOIN movies m ON m.id = r.movie_id @@ -258,31 +260,31 @@ impl SqliteMovieRepository { GROUP BY m.director ORDER BY COUNT(*) DESC LIMIT 1", - user_id ) + .bind(user_id) .fetch_optional(&self.pool) .await .map_err(Self::map_err)?; - Ok(row.flatten()) + Ok(row) } async fn fetch_user_most_active_month( &self, user_id: &str, ) -> Result, DomainError> { - let result: Option> = sqlx::query_scalar!( - "SELECT strftime('%Y-%m', watched_at) AS month + let row: Option = sqlx::query_scalar( + "SELECT strftime('%Y-%m', watched_at) FROM reviews WHERE user_id = ? - GROUP BY month + GROUP BY strftime('%Y-%m', watched_at) ORDER BY COUNT(*) DESC LIMIT 1", - user_id ) + .bind(user_id) .fetch_optional(&self.pool) .await .map_err(Self::map_err)?; - Ok(result.flatten()) + Ok(row) } } @@ -293,12 +295,11 @@ impl MovieRepository for SqliteMovieRepository { external_metadata_id: &ExternalMetadataId, ) -> Result, DomainError> { let id = external_metadata_id.value(); - sqlx::query_as!( - MovieRow, + sqlx::query_as::<_, MovieRow>( "SELECT id, external_metadata_id, title, release_year, director, poster_path FROM movies WHERE external_metadata_id = ?", - id ) + .bind(id) .fetch_optional(&self.pool) .await .map_err(Self::map_err)? @@ -308,12 +309,11 @@ impl MovieRepository for SqliteMovieRepository { async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError> { let id = movie_id.value().to_string(); - sqlx::query_as!( - MovieRow, + sqlx::query_as::<_, MovieRow>( "SELECT id, external_metadata_id, title, release_year, director, poster_path FROM movies WHERE id = ?", - id ) + .bind(&id) .fetch_optional(&self.pool) .await .map_err(Self::map_err)? @@ -326,15 +326,14 @@ impl MovieRepository for SqliteMovieRepository { title: &MovieTitle, year: &ReleaseYear, ) -> Result, DomainError> { - let title = title.value(); - let year = year.value() as i64; - sqlx::query_as!( - MovieRow, + let t = title.value(); + let y = year.value() as i64; + sqlx::query_as::<_, MovieRow>( "SELECT id, external_metadata_id, title, release_year, director, poster_path FROM movies WHERE title = ? AND release_year = ?", - title, - year ) + .bind(t) + .bind(y) .fetch_all(&self.pool) .await .map_err(Self::map_err)? @@ -351,7 +350,7 @@ impl MovieRepository for SqliteMovieRepository { let director = movie.director(); let poster_path = movie.poster_path().map(|p| p.value().to_string()); - sqlx::query!( + sqlx::query( "INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET @@ -360,13 +359,13 @@ impl MovieRepository for SqliteMovieRepository { release_year = excluded.release_year, director = excluded.director, poster_path = excluded.poster_path", - id, - external_metadata_id, - title, - release_year, - director, - poster_path ) + .bind(&id) + .bind(&external_metadata_id) + .bind(title) + .bind(release_year) + .bind(director) + .bind(&poster_path) .execute(&self.pool) .await .map_err(Self::map_err)?; @@ -376,7 +375,8 @@ impl MovieRepository for SqliteMovieRepository { async fn delete_movie(&self, movie_id: &MovieId) -> Result<(), DomainError> { let id = movie_id.value().to_string(); - sqlx::query!("DELETE FROM movies WHERE id = ?", id) + sqlx::query("DELETE FROM movies WHERE id = ?") + .bind(&id) .execute(&self.pool) .await .map_err(Self::map_err)?; @@ -527,18 +527,18 @@ impl ReviewRepository for SqliteMovieRepository { ReviewSource::Remote { actor_url } => Some(actor_url.clone()), }; - sqlx::query!( + sqlx::query( "INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", - id, - movie_id, - user_id, - rating, - comment, - watched_at, - created_at, - remote_actor_url ) + .bind(&id) + .bind(&movie_id) + .bind(&user_id) + .bind(rating) + .bind(&comment) + .bind(&watched_at) + .bind(&created_at) + .bind(&remote_actor_url) .execute(&self.pool) .await .map_err(Self::map_err)?; @@ -554,12 +554,11 @@ impl ReviewRepository for SqliteMovieRepository { async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError> { let id = review_id.value().to_string(); - sqlx::query_as!( - ReviewRow, + sqlx::query_as::<_, ReviewRow>( "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url FROM reviews WHERE id = ?", - id ) + .bind(&id) .fetch_optional(&self.pool) .await .map_err(Self::map_err)? @@ -569,7 +568,8 @@ impl ReviewRepository for SqliteMovieRepository { async fn delete_review(&self, review_id: &ReviewId) -> Result<(), DomainError> { let id = review_id.value().to_string(); - sqlx::query!("DELETE FROM reviews WHERE id = ?", id) + sqlx::query("DELETE FROM reviews WHERE id = ?") + .bind(&id) .execute(&self.pool) .await .map_err(Self::map_err)?; @@ -760,24 +760,22 @@ impl DiaryRepository for SqliteMovieRepository { async fn get_review_history(&self, movie_id: &MovieId) -> Result { let id_str = movie_id.value().to_string(); - let movie = sqlx::query_as!( - MovieRow, + let movie = sqlx::query_as::<_, MovieRow>( "SELECT id, external_metadata_id, title, release_year, director, poster_path FROM movies WHERE id = ?", - id_str ) + .bind(&id_str) .fetch_optional(&self.pool) .await .map_err(Self::map_err)? .ok_or_else(|| DomainError::NotFound(format!("Movie {}", id_str)))? .into_domain()?; - let viewings = sqlx::query_as!( - ReviewRow, + let viewings = sqlx::query_as::<_, ReviewRow>( "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url FROM reviews WHERE movie_id = ? ORDER BY watched_at ASC", - id_str ) + .bind(&id_str) .fetch_all(&self.pool) .await .map_err(Self::map_err)? @@ -790,16 +788,15 @@ impl DiaryRepository for SqliteMovieRepository { async fn get_user_history(&self, user_id: &UserId) -> Result, DomainError> { let uid = user_id.value().to_string(); - let rows = sqlx::query_as!( - DiaryRow, + let rows = sqlx::query_as::<_, DiaryRow>( "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url FROM reviews r INNER JOIN movies m ON m.id = r.movie_id WHERE r.user_id = ? ORDER BY r.watched_at DESC", - uid ) + .bind(&uid) .fetch_all(&self.pool) .await .map_err(Self::map_err)?; @@ -837,10 +834,12 @@ impl DiaryRepository for SqliteMovieRepository { let limit = page.limit as i64; let offset = page.offset as i64; - let total = sqlx::query_scalar!("SELECT COUNT(*) FROM reviews WHERE movie_id = ?", id_str) - .fetch_one(&self.pool) - .await - .map_err(Self::map_err)?; + let total: i64 = + sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews WHERE movie_id = ?") + .bind(&id_str) + .fetch_one(&self.pool) + .await + .map_err(Self::map_err)?; let rows = sqlx::query_as::<_, FeedRow>( "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, @@ -911,30 +910,28 @@ impl StatsRepository for SqliteMovieRepository { let uid = user_id.value().to_string(); let (rating_rows, director_rows) = tokio::try_join!( - sqlx::query_as!( - MonthlyRatingRow, - r#"SELECT strftime('%Y-%m', watched_at) AS "month!", - AVG(CAST(rating AS REAL)) AS "avg_rating!: f64", - COUNT(*) AS "count!: i64" - FROM reviews - WHERE user_id = ? AND watched_at >= datetime('now', '-12 months') - GROUP BY "month!" - ORDER BY "month!" ASC"#, - uid + sqlx::query_as::<_, MonthlyRatingRow>( + "SELECT strftime('%Y-%m', watched_at) AS month, + AVG(CAST(rating AS REAL)) AS avg_rating, + COUNT(*) AS count + FROM reviews + WHERE user_id = ? AND watched_at >= datetime('now', '-12 months') + GROUP BY month + ORDER BY month ASC", ) + .bind(&uid) .fetch_all(&self.pool), - sqlx::query_as!( - DirectorCountRow, - r#"SELECT m.director AS "director!", - COUNT(*) AS "count!: i64" - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.user_id = ? AND m.director IS NOT NULL - GROUP BY m.director - ORDER BY COUNT(*) DESC - LIMIT 5"#, - uid + sqlx::query_as::<_, DirectorCountRow>( + "SELECT m.director, + COUNT(*) AS count + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = ? AND m.director IS NOT NULL + GROUP BY m.director + ORDER BY COUNT(*) DESC + LIMIT 5", ) + .bind(&uid) .fetch_all(&self.pool) ) .map_err(Self::map_err)?; diff --git a/crates/adapters/sqlite/src/profile_fields.rs b/crates/adapters/sqlite/src/profile_fields.rs index 831bd30..5e13feb 100644 --- a/crates/adapters/sqlite/src/profile_fields.rs +++ b/crates/adapters/sqlite/src/profile_fields.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use sqlx::SqlitePool; +use sqlx::{Row, SqlitePool}; use domain::{ errors::DomainError, models::ProfileField, ports::UserProfileFieldsRepository, @@ -14,25 +14,30 @@ impl SqliteProfileFieldsRepository { pub fn new(pool: SqlitePool) -> Self { Self { pool } } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } } #[async_trait] impl UserProfileFieldsRepository for SqliteProfileFieldsRepository { async fn get_fields(&self, user_id: &UserId) -> Result, DomainError> { let id_str = user_id.value().to_string(); - let rows = sqlx::query!( + let rows = sqlx::query( "SELECT name, value FROM user_profile_fields WHERE user_id = ? ORDER BY position ASC", - id_str ) + .bind(&id_str) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + .map_err(Self::map_err)?; Ok(rows - .into_iter() + .iter() .map(|r| ProfileField { - name: r.name, - value: r.value, + name: r.get("name"), + value: r.get("value"), }) .collect()) } @@ -44,21 +49,26 @@ impl UserProfileFieldsRepository for SqliteProfileFieldsRepository { ) -> Result<(), DomainError> { let id_str = user_id.value().to_string(); - sqlx::query!("DELETE FROM user_profile_fields WHERE user_id = ?", id_str) + sqlx::query("DELETE FROM user_profile_fields WHERE user_id = ?") + .bind(&id_str) .execute(&self.pool) .await - .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + .map_err(Self::map_err)?; for (i, field) in fields.into_iter().enumerate() { let id = uuid::Uuid::new_v4().to_string(); let position = i as i64; - sqlx::query!( + sqlx::query( "INSERT INTO user_profile_fields (id, user_id, name, value, position) VALUES (?, ?, ?, ?, ?)", - id, id_str, field.name, field.value, position ) + .bind(&id) + .bind(&id_str) + .bind(&field.name) + .bind(&field.value) + .bind(position) .execute(&self.pool) .await - .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + .map_err(Self::map_err)?; } Ok(())