refactor: replace sqlx compile-time macros with runtime queries
Some checks failed
CI / Check / Test (push) Failing after 44s
Some checks failed
CI / Check / Test (push) Failing after 44s
No longer requires DATABASE_URL at build time.
This commit is contained in:
@@ -118,15 +118,15 @@ impl ImportProfileRepository for SqliteImportProfileRepository {
|
|||||||
let user_id = p.user_id.value().to_string();
|
let user_id = p.user_id.value().to_string();
|
||||||
let created_at = p.created_at.format("%Y-%m-%d %H:%M:%S").to_string();
|
let created_at = p.created_at.format("%Y-%m-%d %H:%M:%S").to_string();
|
||||||
let field_mappings = serialize_mappings(&p.field_mappings)?;
|
let field_mappings = serialize_mappings(&p.field_mappings)?;
|
||||||
sqlx::query!(
|
sqlx::query(
|
||||||
"INSERT OR REPLACE INTO import_profiles (id, user_id, name, field_mappings, created_at)
|
"INSERT OR REPLACE INTO import_profiles (id, user_id, name, field_mappings, created_at)
|
||||||
VALUES (?, ?, ?, ?, ?)",
|
VALUES (?, ?, ?, ?, ?)",
|
||||||
id,
|
|
||||||
user_id,
|
|
||||||
p.name,
|
|
||||||
field_mappings,
|
|
||||||
created_at
|
|
||||||
)
|
)
|
||||||
|
.bind(&id)
|
||||||
|
.bind(&user_id)
|
||||||
|
.bind(&p.name)
|
||||||
|
.bind(&field_mappings)
|
||||||
|
.bind(&created_at)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
@@ -135,29 +135,33 @@ impl ImportProfileRepository for SqliteImportProfileRepository {
|
|||||||
|
|
||||||
async fn list_for_user(&self, user_id: &UserId) -> Result<Vec<ImportProfile>, DomainError> {
|
async fn list_for_user(&self, user_id: &UserId) -> Result<Vec<ImportProfile>, DomainError> {
|
||||||
let uid = user_id.value().to_string();
|
let uid = user_id.value().to_string();
|
||||||
let rows = sqlx::query!(
|
let rows = sqlx::query(
|
||||||
"SELECT id, user_id, name, field_mappings, created_at FROM import_profiles WHERE user_id = ? ORDER BY created_at DESC",
|
"SELECT id, user_id, name, field_mappings, created_at FROM import_profiles WHERE user_id = ? ORDER BY created_at DESC",
|
||||||
uid
|
|
||||||
)
|
)
|
||||||
|
.bind(&uid)
|
||||||
.fetch_all(&self.pool)
|
.fetch_all(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?;
|
.map_err(Self::map_err)?;
|
||||||
|
|
||||||
rows.into_iter()
|
rows.iter()
|
||||||
.map(|r| {
|
.map(|r| {
|
||||||
|
use sqlx::Row;
|
||||||
|
let id_str: String = r.get("id");
|
||||||
|
let uid_str: String = r.get("user_id");
|
||||||
|
let fm: String = r.get("field_mappings");
|
||||||
|
let ca: String = r.get("created_at");
|
||||||
Ok(ImportProfile {
|
Ok(ImportProfile {
|
||||||
id: ImportProfileId::from_uuid(
|
id: ImportProfileId::from_uuid(
|
||||||
r.id.parse::<uuid::Uuid>()
|
id_str.parse::<uuid::Uuid>()
|
||||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?,
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?,
|
||||||
),
|
),
|
||||||
user_id: UserId::from_uuid(
|
user_id: UserId::from_uuid(
|
||||||
r.user_id
|
uid_str.parse::<uuid::Uuid>()
|
||||||
.parse::<uuid::Uuid>()
|
|
||||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?,
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?,
|
||||||
),
|
),
|
||||||
name: r.name,
|
name: r.get("name"),
|
||||||
field_mappings: deserialize_mappings(&r.field_mappings)?,
|
field_mappings: deserialize_mappings(&fm)?,
|
||||||
created_at: Self::parse_dt(&r.created_at)?,
|
created_at: Self::parse_dt(&ca)?,
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
@@ -170,28 +174,33 @@ impl ImportProfileRepository for SqliteImportProfileRepository {
|
|||||||
) -> Result<Option<ImportProfile>, DomainError> {
|
) -> Result<Option<ImportProfile>, DomainError> {
|
||||||
let id_str = id.value().to_string();
|
let id_str = id.value().to_string();
|
||||||
let uid_str = user_id.value().to_string();
|
let uid_str = user_id.value().to_string();
|
||||||
let row = sqlx::query!(
|
let row = sqlx::query(
|
||||||
"SELECT id, user_id, name, field_mappings, created_at FROM import_profiles WHERE id = ? AND user_id = ?",
|
"SELECT id, user_id, name, field_mappings, created_at FROM import_profiles WHERE id = ? AND user_id = ?",
|
||||||
id_str, uid_str
|
|
||||||
)
|
)
|
||||||
|
.bind(&id_str)
|
||||||
|
.bind(&uid_str)
|
||||||
.fetch_optional(&self.pool)
|
.fetch_optional(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?;
|
.map_err(Self::map_err)?;
|
||||||
|
|
||||||
row.map(|r| {
|
row.map(|r| {
|
||||||
|
use sqlx::Row;
|
||||||
|
let rid: String = r.get("id");
|
||||||
|
let ruid: String = r.get("user_id");
|
||||||
|
let fm: String = r.get("field_mappings");
|
||||||
|
let ca: String = r.get("created_at");
|
||||||
Ok(ImportProfile {
|
Ok(ImportProfile {
|
||||||
id: ImportProfileId::from_uuid(
|
id: ImportProfileId::from_uuid(
|
||||||
r.id.parse::<uuid::Uuid>()
|
rid.parse::<uuid::Uuid>()
|
||||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?,
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?,
|
||||||
),
|
),
|
||||||
user_id: UserId::from_uuid(
|
user_id: UserId::from_uuid(
|
||||||
r.user_id
|
ruid.parse::<uuid::Uuid>()
|
||||||
.parse::<uuid::Uuid>()
|
|
||||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?,
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?,
|
||||||
),
|
),
|
||||||
name: r.name,
|
name: r.get("name"),
|
||||||
field_mappings: deserialize_mappings(&r.field_mappings)?,
|
field_mappings: deserialize_mappings(&fm)?,
|
||||||
created_at: Self::parse_dt(&r.created_at)?,
|
created_at: Self::parse_dt(&ca)?,
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.transpose()
|
.transpose()
|
||||||
@@ -199,7 +208,8 @@ impl ImportProfileRepository for SqliteImportProfileRepository {
|
|||||||
|
|
||||||
async fn delete(&self, id: &ImportProfileId) -> Result<(), DomainError> {
|
async fn delete(&self, id: &ImportProfileId) -> Result<(), DomainError> {
|
||||||
let id_str = id.value().to_string();
|
let id_str = id.value().to_string();
|
||||||
sqlx::query!("DELETE FROM import_profiles WHERE id = ?", id_str)
|
sqlx::query("DELETE FROM import_profiles WHERE id = ?")
|
||||||
|
.bind(&id_str)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
|
|||||||
@@ -302,11 +302,17 @@ impl ImportSessionRepository for SqliteImportSessionRepository {
|
|||||||
let created_at = s.created_at.format("%Y-%m-%d %H:%M:%S").to_string();
|
let created_at = s.created_at.format("%Y-%m-%d %H:%M:%S").to_string();
|
||||||
let expires_at = s.expires_at.format("%Y-%m-%d %H:%M:%S").to_string();
|
let expires_at = s.expires_at.format("%Y-%m-%d %H:%M:%S").to_string();
|
||||||
let (parsed_data, field_mappings, row_results) = Self::serialize_session(s)?;
|
let (parsed_data, field_mappings, row_results) = Self::serialize_session(s)?;
|
||||||
sqlx::query!(
|
sqlx::query(
|
||||||
"INSERT INTO import_sessions (id, user_id, parsed_data, field_mappings, row_results, created_at, expires_at)
|
"INSERT INTO import_sessions (id, user_id, parsed_data, field_mappings, row_results, created_at, expires_at)
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?)",
|
VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||||
id, user_id, parsed_data, field_mappings, row_results, created_at, expires_at
|
|
||||||
)
|
)
|
||||||
|
.bind(&id)
|
||||||
|
.bind(&user_id)
|
||||||
|
.bind(&parsed_data)
|
||||||
|
.bind(&field_mappings)
|
||||||
|
.bind(&row_results)
|
||||||
|
.bind(&created_at)
|
||||||
|
.bind(&expires_at)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
@@ -320,25 +326,26 @@ impl ImportSessionRepository for SqliteImportSessionRepository {
|
|||||||
) -> Result<Option<ImportSession>, DomainError> {
|
) -> Result<Option<ImportSession>, DomainError> {
|
||||||
let id_str = id.value().to_string();
|
let id_str = id.value().to_string();
|
||||||
let uid_str = user_id.value().to_string();
|
let uid_str = user_id.value().to_string();
|
||||||
let row = sqlx::query!(
|
let row = sqlx::query(
|
||||||
"SELECT id, user_id, parsed_data, field_mappings, row_results, created_at, expires_at
|
"SELECT id, user_id, parsed_data, field_mappings, row_results, created_at, expires_at
|
||||||
FROM import_sessions WHERE id = ? AND user_id = ?",
|
FROM import_sessions WHERE id = ? AND user_id = ?",
|
||||||
id_str,
|
|
||||||
uid_str
|
|
||||||
)
|
)
|
||||||
|
.bind(&id_str)
|
||||||
|
.bind(&uid_str)
|
||||||
.fetch_optional(&self.pool)
|
.fetch_optional(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?;
|
.map_err(Self::map_err)?;
|
||||||
|
|
||||||
row.map(|r| {
|
row.map(|r| {
|
||||||
|
use sqlx::Row;
|
||||||
Self::deserialize_session(
|
Self::deserialize_session(
|
||||||
r.id,
|
r.get("id"),
|
||||||
r.user_id,
|
r.get("user_id"),
|
||||||
r.parsed_data,
|
r.get("parsed_data"),
|
||||||
r.field_mappings,
|
r.get("field_mappings"),
|
||||||
r.row_results,
|
r.get("row_results"),
|
||||||
&r.created_at,
|
&r.get::<String, _>("created_at"),
|
||||||
&r.expires_at,
|
&r.get::<String, _>("expires_at"),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.transpose()
|
.transpose()
|
||||||
@@ -347,12 +354,12 @@ impl ImportSessionRepository for SqliteImportSessionRepository {
|
|||||||
async fn update(&self, s: &ImportSession) -> Result<(), DomainError> {
|
async fn update(&self, s: &ImportSession) -> Result<(), DomainError> {
|
||||||
let id = s.id.value().to_string();
|
let id = s.id.value().to_string();
|
||||||
let (_, field_mappings, row_results) = Self::serialize_session(s)?;
|
let (_, field_mappings, row_results) = Self::serialize_session(s)?;
|
||||||
sqlx::query!(
|
sqlx::query(
|
||||||
"UPDATE import_sessions SET field_mappings = ?, row_results = ? WHERE id = ?",
|
"UPDATE import_sessions SET field_mappings = ?, row_results = ? WHERE id = ?",
|
||||||
field_mappings,
|
|
||||||
row_results,
|
|
||||||
id
|
|
||||||
)
|
)
|
||||||
|
.bind(&field_mappings)
|
||||||
|
.bind(&row_results)
|
||||||
|
.bind(&id)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
@@ -361,7 +368,8 @@ impl ImportSessionRepository for SqliteImportSessionRepository {
|
|||||||
|
|
||||||
async fn delete(&self, id: &ImportSessionId) -> Result<(), DomainError> {
|
async fn delete(&self, id: &ImportSessionId) -> Result<(), DomainError> {
|
||||||
let id_str = id.value().to_string();
|
let id_str = id.value().to_string();
|
||||||
sqlx::query!("DELETE FROM import_sessions WHERE id = ?", id_str)
|
sqlx::query("DELETE FROM import_sessions WHERE id = ?")
|
||||||
|
.bind(&id_str)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
@@ -369,7 +377,8 @@ impl ImportSessionRepository for SqliteImportSessionRepository {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_expired(&self) -> Result<u64, DomainError> {
|
async fn delete_expired(&self) -> Result<u64, DomainError> {
|
||||||
let result = sqlx::query!("DELETE FROM import_sessions WHERE expires_at < datetime('now')")
|
let result =
|
||||||
|
sqlx::query("DELETE FROM import_sessions WHERE expires_at < datetime('now')")
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?;
|
.map_err(Self::map_err)?;
|
||||||
@@ -378,10 +387,10 @@ impl ImportSessionRepository for SqliteImportSessionRepository {
|
|||||||
|
|
||||||
async fn delete_expired_for_user(&self, user_id: &UserId) -> Result<(), DomainError> {
|
async fn delete_expired_for_user(&self, user_id: &UserId) -> Result<(), DomainError> {
|
||||||
let uid = user_id.value().to_string();
|
let uid = user_id.value().to_string();
|
||||||
sqlx::query!(
|
sqlx::query(
|
||||||
"DELETE FROM import_sessions WHERE user_id = ? AND expires_at < datetime('now')",
|
"DELETE FROM import_sessions WHERE user_id = ? AND expires_at < datetime('now')",
|
||||||
uid
|
|
||||||
)
|
)
|
||||||
|
.bind(&uid)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
|
|||||||
@@ -96,14 +96,17 @@ impl SqliteMovieRepository {
|
|||||||
|
|
||||||
async fn count_diary_entries(&self, movie_id: Option<&str>) -> Result<i64, DomainError> {
|
async fn count_diary_entries(&self, movie_id: Option<&str>) -> Result<i64, DomainError> {
|
||||||
match movie_id {
|
match movie_id {
|
||||||
None => sqlx::query_scalar!("SELECT COUNT(*) FROM reviews")
|
None => sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews")
|
||||||
.fetch_one(&self.pool)
|
.fetch_one(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err),
|
.map_err(Self::map_err),
|
||||||
Some(id) => sqlx::query_scalar!("SELECT COUNT(*) FROM reviews WHERE movie_id = ?", id)
|
Some(id) => {
|
||||||
|
sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews WHERE movie_id = ?")
|
||||||
|
.bind(id)
|
||||||
.fetch_one(&self.pool)
|
.fetch_one(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err),
|
.map_err(Self::map_err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -234,13 +237,12 @@ impl SqliteMovieRepository {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_user_totals(&self, user_id: &str) -> Result<UserTotalsRow, DomainError> {
|
async fn fetch_user_totals(&self, user_id: &str) -> Result<UserTotalsRow, DomainError> {
|
||||||
sqlx::query_as!(
|
sqlx::query_as::<_, UserTotalsRow>(
|
||||||
UserTotalsRow,
|
"SELECT COUNT(DISTINCT movie_id) AS total,
|
||||||
r#"SELECT COUNT(DISTINCT movie_id) AS "total!: i64",
|
|
||||||
AVG(CAST(rating AS REAL)) AS avg_rating
|
AVG(CAST(rating AS REAL)) AS avg_rating
|
||||||
FROM reviews WHERE user_id = ?"#,
|
FROM reviews WHERE user_id = ?",
|
||||||
user_id
|
|
||||||
)
|
)
|
||||||
|
.bind(user_id)
|
||||||
.fetch_one(&self.pool)
|
.fetch_one(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)
|
.map_err(Self::map_err)
|
||||||
@@ -250,7 +252,7 @@ impl SqliteMovieRepository {
|
|||||||
&self,
|
&self,
|
||||||
user_id: &str,
|
user_id: &str,
|
||||||
) -> Result<Option<String>, DomainError> {
|
) -> Result<Option<String>, DomainError> {
|
||||||
let row = sqlx::query_scalar!(
|
let row: Option<String> = sqlx::query_scalar(
|
||||||
"SELECT m.director
|
"SELECT m.director
|
||||||
FROM reviews r
|
FROM reviews r
|
||||||
INNER JOIN movies m ON m.id = r.movie_id
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
@@ -258,31 +260,31 @@ impl SqliteMovieRepository {
|
|||||||
GROUP BY m.director
|
GROUP BY m.director
|
||||||
ORDER BY COUNT(*) DESC
|
ORDER BY COUNT(*) DESC
|
||||||
LIMIT 1",
|
LIMIT 1",
|
||||||
user_id
|
|
||||||
)
|
)
|
||||||
|
.bind(user_id)
|
||||||
.fetch_optional(&self.pool)
|
.fetch_optional(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?;
|
.map_err(Self::map_err)?;
|
||||||
Ok(row.flatten())
|
Ok(row)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_user_most_active_month(
|
async fn fetch_user_most_active_month(
|
||||||
&self,
|
&self,
|
||||||
user_id: &str,
|
user_id: &str,
|
||||||
) -> Result<Option<String>, DomainError> {
|
) -> Result<Option<String>, DomainError> {
|
||||||
let result: Option<Option<String>> = sqlx::query_scalar!(
|
let row: Option<String> = sqlx::query_scalar(
|
||||||
"SELECT strftime('%Y-%m', watched_at) AS month
|
"SELECT strftime('%Y-%m', watched_at)
|
||||||
FROM reviews
|
FROM reviews
|
||||||
WHERE user_id = ?
|
WHERE user_id = ?
|
||||||
GROUP BY month
|
GROUP BY strftime('%Y-%m', watched_at)
|
||||||
ORDER BY COUNT(*) DESC
|
ORDER BY COUNT(*) DESC
|
||||||
LIMIT 1",
|
LIMIT 1",
|
||||||
user_id
|
|
||||||
)
|
)
|
||||||
|
.bind(user_id)
|
||||||
.fetch_optional(&self.pool)
|
.fetch_optional(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?;
|
.map_err(Self::map_err)?;
|
||||||
Ok(result.flatten())
|
Ok(row)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -293,12 +295,11 @@ impl MovieRepository for SqliteMovieRepository {
|
|||||||
external_metadata_id: &ExternalMetadataId,
|
external_metadata_id: &ExternalMetadataId,
|
||||||
) -> Result<Option<Movie>, DomainError> {
|
) -> Result<Option<Movie>, DomainError> {
|
||||||
let id = external_metadata_id.value();
|
let id = external_metadata_id.value();
|
||||||
sqlx::query_as!(
|
sqlx::query_as::<_, MovieRow>(
|
||||||
MovieRow,
|
|
||||||
"SELECT id, external_metadata_id, title, release_year, director, poster_path
|
"SELECT id, external_metadata_id, title, release_year, director, poster_path
|
||||||
FROM movies WHERE external_metadata_id = ?",
|
FROM movies WHERE external_metadata_id = ?",
|
||||||
id
|
|
||||||
)
|
)
|
||||||
|
.bind(id)
|
||||||
.fetch_optional(&self.pool)
|
.fetch_optional(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?
|
.map_err(Self::map_err)?
|
||||||
@@ -308,12 +309,11 @@ impl MovieRepository for SqliteMovieRepository {
|
|||||||
|
|
||||||
async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result<Option<Movie>, DomainError> {
|
async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result<Option<Movie>, DomainError> {
|
||||||
let id = movie_id.value().to_string();
|
let id = movie_id.value().to_string();
|
||||||
sqlx::query_as!(
|
sqlx::query_as::<_, MovieRow>(
|
||||||
MovieRow,
|
|
||||||
"SELECT id, external_metadata_id, title, release_year, director, poster_path
|
"SELECT id, external_metadata_id, title, release_year, director, poster_path
|
||||||
FROM movies WHERE id = ?",
|
FROM movies WHERE id = ?",
|
||||||
id
|
|
||||||
)
|
)
|
||||||
|
.bind(&id)
|
||||||
.fetch_optional(&self.pool)
|
.fetch_optional(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?
|
.map_err(Self::map_err)?
|
||||||
@@ -326,15 +326,14 @@ impl MovieRepository for SqliteMovieRepository {
|
|||||||
title: &MovieTitle,
|
title: &MovieTitle,
|
||||||
year: &ReleaseYear,
|
year: &ReleaseYear,
|
||||||
) -> Result<Vec<Movie>, DomainError> {
|
) -> Result<Vec<Movie>, DomainError> {
|
||||||
let title = title.value();
|
let t = title.value();
|
||||||
let year = year.value() as i64;
|
let y = year.value() as i64;
|
||||||
sqlx::query_as!(
|
sqlx::query_as::<_, MovieRow>(
|
||||||
MovieRow,
|
|
||||||
"SELECT id, external_metadata_id, title, release_year, director, poster_path
|
"SELECT id, external_metadata_id, title, release_year, director, poster_path
|
||||||
FROM movies WHERE title = ? AND release_year = ?",
|
FROM movies WHERE title = ? AND release_year = ?",
|
||||||
title,
|
|
||||||
year
|
|
||||||
)
|
)
|
||||||
|
.bind(t)
|
||||||
|
.bind(y)
|
||||||
.fetch_all(&self.pool)
|
.fetch_all(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?
|
.map_err(Self::map_err)?
|
||||||
@@ -351,7 +350,7 @@ impl MovieRepository for SqliteMovieRepository {
|
|||||||
let director = movie.director();
|
let director = movie.director();
|
||||||
let poster_path = movie.poster_path().map(|p| p.value().to_string());
|
let poster_path = movie.poster_path().map(|p| p.value().to_string());
|
||||||
|
|
||||||
sqlx::query!(
|
sqlx::query(
|
||||||
"INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path)
|
"INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path)
|
||||||
VALUES (?, ?, ?, ?, ?, ?)
|
VALUES (?, ?, ?, ?, ?, ?)
|
||||||
ON CONFLICT(id) DO UPDATE SET
|
ON CONFLICT(id) DO UPDATE SET
|
||||||
@@ -360,13 +359,13 @@ impl MovieRepository for SqliteMovieRepository {
|
|||||||
release_year = excluded.release_year,
|
release_year = excluded.release_year,
|
||||||
director = excluded.director,
|
director = excluded.director,
|
||||||
poster_path = excluded.poster_path",
|
poster_path = excluded.poster_path",
|
||||||
id,
|
|
||||||
external_metadata_id,
|
|
||||||
title,
|
|
||||||
release_year,
|
|
||||||
director,
|
|
||||||
poster_path
|
|
||||||
)
|
)
|
||||||
|
.bind(&id)
|
||||||
|
.bind(&external_metadata_id)
|
||||||
|
.bind(title)
|
||||||
|
.bind(release_year)
|
||||||
|
.bind(director)
|
||||||
|
.bind(&poster_path)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?;
|
.map_err(Self::map_err)?;
|
||||||
@@ -376,7 +375,8 @@ impl MovieRepository for SqliteMovieRepository {
|
|||||||
|
|
||||||
async fn delete_movie(&self, movie_id: &MovieId) -> Result<(), DomainError> {
|
async fn delete_movie(&self, movie_id: &MovieId) -> Result<(), DomainError> {
|
||||||
let id = movie_id.value().to_string();
|
let id = movie_id.value().to_string();
|
||||||
sqlx::query!("DELETE FROM movies WHERE id = ?", id)
|
sqlx::query("DELETE FROM movies WHERE id = ?")
|
||||||
|
.bind(&id)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?;
|
.map_err(Self::map_err)?;
|
||||||
@@ -527,18 +527,18 @@ impl ReviewRepository for SqliteMovieRepository {
|
|||||||
ReviewSource::Remote { actor_url } => Some(actor_url.clone()),
|
ReviewSource::Remote { actor_url } => Some(actor_url.clone()),
|
||||||
};
|
};
|
||||||
|
|
||||||
sqlx::query!(
|
sqlx::query(
|
||||||
"INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url)
|
"INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url)
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||||
id,
|
|
||||||
movie_id,
|
|
||||||
user_id,
|
|
||||||
rating,
|
|
||||||
comment,
|
|
||||||
watched_at,
|
|
||||||
created_at,
|
|
||||||
remote_actor_url
|
|
||||||
)
|
)
|
||||||
|
.bind(&id)
|
||||||
|
.bind(&movie_id)
|
||||||
|
.bind(&user_id)
|
||||||
|
.bind(rating)
|
||||||
|
.bind(&comment)
|
||||||
|
.bind(&watched_at)
|
||||||
|
.bind(&created_at)
|
||||||
|
.bind(&remote_actor_url)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?;
|
.map_err(Self::map_err)?;
|
||||||
@@ -554,12 +554,11 @@ impl ReviewRepository for SqliteMovieRepository {
|
|||||||
|
|
||||||
async fn get_review_by_id(&self, review_id: &ReviewId) -> Result<Option<Review>, DomainError> {
|
async fn get_review_by_id(&self, review_id: &ReviewId) -> Result<Option<Review>, DomainError> {
|
||||||
let id = review_id.value().to_string();
|
let id = review_id.value().to_string();
|
||||||
sqlx::query_as!(
|
sqlx::query_as::<_, ReviewRow>(
|
||||||
ReviewRow,
|
|
||||||
"SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url
|
"SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url
|
||||||
FROM reviews WHERE id = ?",
|
FROM reviews WHERE id = ?",
|
||||||
id
|
|
||||||
)
|
)
|
||||||
|
.bind(&id)
|
||||||
.fetch_optional(&self.pool)
|
.fetch_optional(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?
|
.map_err(Self::map_err)?
|
||||||
@@ -569,7 +568,8 @@ impl ReviewRepository for SqliteMovieRepository {
|
|||||||
|
|
||||||
async fn delete_review(&self, review_id: &ReviewId) -> Result<(), DomainError> {
|
async fn delete_review(&self, review_id: &ReviewId) -> Result<(), DomainError> {
|
||||||
let id = review_id.value().to_string();
|
let id = review_id.value().to_string();
|
||||||
sqlx::query!("DELETE FROM reviews WHERE id = ?", id)
|
sqlx::query("DELETE FROM reviews WHERE id = ?")
|
||||||
|
.bind(&id)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?;
|
.map_err(Self::map_err)?;
|
||||||
@@ -760,24 +760,22 @@ impl DiaryRepository for SqliteMovieRepository {
|
|||||||
async fn get_review_history(&self, movie_id: &MovieId) -> Result<ReviewHistory, DomainError> {
|
async fn get_review_history(&self, movie_id: &MovieId) -> Result<ReviewHistory, DomainError> {
|
||||||
let id_str = movie_id.value().to_string();
|
let id_str = movie_id.value().to_string();
|
||||||
|
|
||||||
let movie = sqlx::query_as!(
|
let movie = sqlx::query_as::<_, MovieRow>(
|
||||||
MovieRow,
|
|
||||||
"SELECT id, external_metadata_id, title, release_year, director, poster_path
|
"SELECT id, external_metadata_id, title, release_year, director, poster_path
|
||||||
FROM movies WHERE id = ?",
|
FROM movies WHERE id = ?",
|
||||||
id_str
|
|
||||||
)
|
)
|
||||||
|
.bind(&id_str)
|
||||||
.fetch_optional(&self.pool)
|
.fetch_optional(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?
|
.map_err(Self::map_err)?
|
||||||
.ok_or_else(|| DomainError::NotFound(format!("Movie {}", id_str)))?
|
.ok_or_else(|| DomainError::NotFound(format!("Movie {}", id_str)))?
|
||||||
.into_domain()?;
|
.into_domain()?;
|
||||||
|
|
||||||
let viewings = sqlx::query_as!(
|
let viewings = sqlx::query_as::<_, ReviewRow>(
|
||||||
ReviewRow,
|
|
||||||
"SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url
|
"SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url
|
||||||
FROM reviews WHERE movie_id = ? ORDER BY watched_at ASC",
|
FROM reviews WHERE movie_id = ? ORDER BY watched_at ASC",
|
||||||
id_str
|
|
||||||
)
|
)
|
||||||
|
.bind(&id_str)
|
||||||
.fetch_all(&self.pool)
|
.fetch_all(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?
|
.map_err(Self::map_err)?
|
||||||
@@ -790,16 +788,15 @@ impl DiaryRepository for SqliteMovieRepository {
|
|||||||
|
|
||||||
async fn get_user_history(&self, user_id: &UserId) -> Result<Vec<DiaryEntry>, DomainError> {
|
async fn get_user_history(&self, user_id: &UserId) -> Result<Vec<DiaryEntry>, DomainError> {
|
||||||
let uid = user_id.value().to_string();
|
let uid = user_id.value().to_string();
|
||||||
let rows = sqlx::query_as!(
|
let rows = sqlx::query_as::<_, DiaryRow>(
|
||||||
DiaryRow,
|
|
||||||
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
||||||
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url
|
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url
|
||||||
FROM reviews r
|
FROM reviews r
|
||||||
INNER JOIN movies m ON m.id = r.movie_id
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
WHERE r.user_id = ?
|
WHERE r.user_id = ?
|
||||||
ORDER BY r.watched_at DESC",
|
ORDER BY r.watched_at DESC",
|
||||||
uid
|
|
||||||
)
|
)
|
||||||
|
.bind(&uid)
|
||||||
.fetch_all(&self.pool)
|
.fetch_all(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?;
|
.map_err(Self::map_err)?;
|
||||||
@@ -837,7 +834,9 @@ impl DiaryRepository for SqliteMovieRepository {
|
|||||||
let limit = page.limit as i64;
|
let limit = page.limit as i64;
|
||||||
let offset = page.offset as i64;
|
let offset = page.offset as i64;
|
||||||
|
|
||||||
let total = sqlx::query_scalar!("SELECT COUNT(*) FROM reviews WHERE movie_id = ?", id_str)
|
let total: i64 =
|
||||||
|
sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews WHERE movie_id = ?")
|
||||||
|
.bind(&id_str)
|
||||||
.fetch_one(&self.pool)
|
.fetch_one(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(Self::map_err)?;
|
.map_err(Self::map_err)?;
|
||||||
@@ -911,30 +910,28 @@ impl StatsRepository for SqliteMovieRepository {
|
|||||||
let uid = user_id.value().to_string();
|
let uid = user_id.value().to_string();
|
||||||
|
|
||||||
let (rating_rows, director_rows) = tokio::try_join!(
|
let (rating_rows, director_rows) = tokio::try_join!(
|
||||||
sqlx::query_as!(
|
sqlx::query_as::<_, MonthlyRatingRow>(
|
||||||
MonthlyRatingRow,
|
"SELECT strftime('%Y-%m', watched_at) AS month,
|
||||||
r#"SELECT strftime('%Y-%m', watched_at) AS "month!",
|
AVG(CAST(rating AS REAL)) AS avg_rating,
|
||||||
AVG(CAST(rating AS REAL)) AS "avg_rating!: f64",
|
COUNT(*) AS count
|
||||||
COUNT(*) AS "count!: i64"
|
|
||||||
FROM reviews
|
FROM reviews
|
||||||
WHERE user_id = ? AND watched_at >= datetime('now', '-12 months')
|
WHERE user_id = ? AND watched_at >= datetime('now', '-12 months')
|
||||||
GROUP BY "month!"
|
GROUP BY month
|
||||||
ORDER BY "month!" ASC"#,
|
ORDER BY month ASC",
|
||||||
uid
|
|
||||||
)
|
)
|
||||||
|
.bind(&uid)
|
||||||
.fetch_all(&self.pool),
|
.fetch_all(&self.pool),
|
||||||
sqlx::query_as!(
|
sqlx::query_as::<_, DirectorCountRow>(
|
||||||
DirectorCountRow,
|
"SELECT m.director,
|
||||||
r#"SELECT m.director AS "director!",
|
COUNT(*) AS count
|
||||||
COUNT(*) AS "count!: i64"
|
|
||||||
FROM reviews r
|
FROM reviews r
|
||||||
INNER JOIN movies m ON m.id = r.movie_id
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
WHERE r.user_id = ? AND m.director IS NOT NULL
|
WHERE r.user_id = ? AND m.director IS NOT NULL
|
||||||
GROUP BY m.director
|
GROUP BY m.director
|
||||||
ORDER BY COUNT(*) DESC
|
ORDER BY COUNT(*) DESC
|
||||||
LIMIT 5"#,
|
LIMIT 5",
|
||||||
uid
|
|
||||||
)
|
)
|
||||||
|
.bind(&uid)
|
||||||
.fetch_all(&self.pool)
|
.fetch_all(&self.pool)
|
||||||
)
|
)
|
||||||
.map_err(Self::map_err)?;
|
.map_err(Self::map_err)?;
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use sqlx::SqlitePool;
|
use sqlx::{Row, SqlitePool};
|
||||||
|
|
||||||
use domain::{
|
use domain::{
|
||||||
errors::DomainError, models::ProfileField, ports::UserProfileFieldsRepository,
|
errors::DomainError, models::ProfileField, ports::UserProfileFieldsRepository,
|
||||||
@@ -14,25 +14,30 @@ impl SqliteProfileFieldsRepository {
|
|||||||
pub fn new(pool: SqlitePool) -> Self {
|
pub fn new(pool: SqlitePool) -> Self {
|
||||||
Self { pool }
|
Self { pool }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn map_err(e: sqlx::Error) -> DomainError {
|
||||||
|
tracing::error!("Database error: {:?}", e);
|
||||||
|
DomainError::InfrastructureError("Database operation failed".into())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl UserProfileFieldsRepository for SqliteProfileFieldsRepository {
|
impl UserProfileFieldsRepository for SqliteProfileFieldsRepository {
|
||||||
async fn get_fields(&self, user_id: &UserId) -> Result<Vec<ProfileField>, DomainError> {
|
async fn get_fields(&self, user_id: &UserId) -> Result<Vec<ProfileField>, DomainError> {
|
||||||
let id_str = user_id.value().to_string();
|
let id_str = user_id.value().to_string();
|
||||||
let rows = sqlx::query!(
|
let rows = sqlx::query(
|
||||||
"SELECT name, value FROM user_profile_fields WHERE user_id = ? ORDER BY position ASC",
|
"SELECT name, value FROM user_profile_fields WHERE user_id = ? ORDER BY position ASC",
|
||||||
id_str
|
|
||||||
)
|
)
|
||||||
|
.bind(&id_str)
|
||||||
.fetch_all(&self.pool)
|
.fetch_all(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
.map_err(Self::map_err)?;
|
||||||
|
|
||||||
Ok(rows
|
Ok(rows
|
||||||
.into_iter()
|
.iter()
|
||||||
.map(|r| ProfileField {
|
.map(|r| ProfileField {
|
||||||
name: r.name,
|
name: r.get("name"),
|
||||||
value: r.value,
|
value: r.get("value"),
|
||||||
})
|
})
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
@@ -44,21 +49,26 @@ impl UserProfileFieldsRepository for SqliteProfileFieldsRepository {
|
|||||||
) -> Result<(), DomainError> {
|
) -> Result<(), DomainError> {
|
||||||
let id_str = user_id.value().to_string();
|
let id_str = user_id.value().to_string();
|
||||||
|
|
||||||
sqlx::query!("DELETE FROM user_profile_fields WHERE user_id = ?", id_str)
|
sqlx::query("DELETE FROM user_profile_fields WHERE user_id = ?")
|
||||||
|
.bind(&id_str)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
.map_err(Self::map_err)?;
|
||||||
|
|
||||||
for (i, field) in fields.into_iter().enumerate() {
|
for (i, field) in fields.into_iter().enumerate() {
|
||||||
let id = uuid::Uuid::new_v4().to_string();
|
let id = uuid::Uuid::new_v4().to_string();
|
||||||
let position = i as i64;
|
let position = i as i64;
|
||||||
sqlx::query!(
|
sqlx::query(
|
||||||
"INSERT INTO user_profile_fields (id, user_id, name, value, position) VALUES (?, ?, ?, ?, ?)",
|
"INSERT INTO user_profile_fields (id, user_id, name, value, position) VALUES (?, ?, ?, ?, ?)",
|
||||||
id, id_str, field.name, field.value, position
|
|
||||||
)
|
)
|
||||||
|
.bind(&id)
|
||||||
|
.bind(&id_str)
|
||||||
|
.bind(&field.name)
|
||||||
|
.bind(&field.value)
|
||||||
|
.bind(position)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
.map_err(Self::map_err)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
Reference in New Issue
Block a user