rewrite to sqlx
This commit is contained in:
@@ -1,11 +1,7 @@
|
||||
use mongodb::{
|
||||
Client,
|
||||
bson::{doc, to_document},
|
||||
options::UpdateOptions,
|
||||
};
|
||||
use reqwest::Client as HttpClient;
|
||||
use rick_and_morty::models::{Character, OriginOrLocation};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::{Row, SqlitePool};
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
struct ApiCharacter {
|
||||
@@ -27,17 +23,20 @@ struct ApiCharacter {
|
||||
impl From<ApiCharacter> for Character {
|
||||
fn from(api: ApiCharacter) -> Self {
|
||||
Character {
|
||||
id: None, // always None for new/incoming data
|
||||
id: 0, // Ignored for new/incoming data; SQLite autoincrements
|
||||
rmid: api.id,
|
||||
name: api.name,
|
||||
status: api.status,
|
||||
species: api.species,
|
||||
r#type: api.character_type,
|
||||
gender: api.gender,
|
||||
origin: api.origin,
|
||||
location: api.location,
|
||||
origin_name: api.origin.name,
|
||||
origin_url: api.origin.url,
|
||||
location_name: api.location.name,
|
||||
location_url: api.location.url,
|
||||
image: api.image,
|
||||
episode: api.episode,
|
||||
// Store as JSON string
|
||||
episode: serde_json::to_string(&api.episode).unwrap(),
|
||||
url: api.url,
|
||||
created: api.created,
|
||||
elo_rating: 1000.0,
|
||||
@@ -58,13 +57,9 @@ fn init_tracing() {
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
init_tracing();
|
||||
dotenvy::dotenv().ok();
|
||||
let db_uri =
|
||||
std::env::var("MONGODB_URI").unwrap_or_else(|_| "mongodb://localhost:27017".to_string());
|
||||
let db_name = std::env::var("DB_NAME").unwrap_or_else(|_| "rick_and_morty".to_string());
|
||||
|
||||
let client = Client::with_uri_str(&db_uri).await?;
|
||||
let db = client.database(&db_name);
|
||||
let collection = db.collection::<Character>("characters");
|
||||
let db_url = std::env::var("DATABASE_URL").expect("DATABASE_URL not set");
|
||||
let pool = SqlitePool::connect(&db_url).await?;
|
||||
|
||||
tracing::info!("Starting to fetch characters from Rick and Morty API");
|
||||
let http = HttpClient::new();
|
||||
@@ -92,30 +87,77 @@ async fn main() -> anyhow::Result<()> {
|
||||
count = all_characters.len(),
|
||||
"Fetched all characters, starting DB upsert"
|
||||
);
|
||||
let options = UpdateOptions::builder().upsert(true).build();
|
||||
// let insert_result = collection.insert_many(all_characters.clone()).await?;
|
||||
|
||||
for character in &all_characters {
|
||||
let filter = doc! { "rmid": character.rmid };
|
||||
let mut set_doc = to_document(character)?;
|
||||
set_doc.remove("elo_rating"); // Do NOT overwrite existing Elo
|
||||
let update = doc! {
|
||||
"$set": set_doc,
|
||||
"$setOnInsert": { "elo_rating": 1000.0 }
|
||||
};
|
||||
if let Err(e) = collection
|
||||
.update_one(filter, update)
|
||||
.with_options(Some(options.clone()))
|
||||
.await
|
||||
{
|
||||
tracing::error!(error = ?e, id = character.rmid, name = %character.name, "Failed to upsert character");
|
||||
// Try to fetch existing character by rmid
|
||||
let row = sqlx::query("SELECT id, elo_rating FROM characters WHERE rmid = ?")
|
||||
.bind(character.rmid)
|
||||
.fetch_optional(&pool)
|
||||
.await?;
|
||||
|
||||
if let Some(row) = row {
|
||||
// Exists: update all fields except elo_rating
|
||||
let id: i64 = row.get("id");
|
||||
sqlx::query(
|
||||
"UPDATE characters SET
|
||||
name = ?, status = ?, species = ?, type = ?, gender = ?,
|
||||
origin_name = ?, origin_url = ?,
|
||||
location_name = ?, location_url = ?,
|
||||
image = ?, episode = ?, url = ?, created = ?
|
||||
WHERE id = ?",
|
||||
)
|
||||
.bind(&character.name)
|
||||
.bind(&character.status)
|
||||
.bind(&character.species)
|
||||
.bind(&character.r#type)
|
||||
.bind(&character.gender)
|
||||
.bind(&character.origin_name)
|
||||
.bind(&character.origin_url)
|
||||
.bind(&character.location_name)
|
||||
.bind(&character.location_url)
|
||||
.bind(&character.image)
|
||||
.bind(&character.episode)
|
||||
.bind(&character.url)
|
||||
.bind(&character.created)
|
||||
.bind(id)
|
||||
.execute(&pool)
|
||||
.await?;
|
||||
tracing::info!(id = character.rmid, name = %character.name, "Updated character (no elo overwrite)");
|
||||
} else {
|
||||
// Insert new character with default elo_rating
|
||||
sqlx::query(
|
||||
"INSERT INTO characters (
|
||||
rmid, name, status, species, type, gender,
|
||||
origin_name, origin_url, location_name, location_url,
|
||||
image, episode, url, created, elo_rating
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
)
|
||||
.bind(character.rmid)
|
||||
.bind(&character.name)
|
||||
.bind(&character.status)
|
||||
.bind(&character.species)
|
||||
.bind(&character.r#type)
|
||||
.bind(&character.gender)
|
||||
.bind(&character.origin_name)
|
||||
.bind(&character.origin_url)
|
||||
.bind(&character.location_name)
|
||||
.bind(&character.location_url)
|
||||
.bind(&character.image)
|
||||
.bind(&character.episode)
|
||||
.bind(&character.url)
|
||||
.bind(&character.created)
|
||||
.bind(1000.0)
|
||||
.execute(&pool)
|
||||
.await?;
|
||||
tracing::info!(id = character.rmid, name = %character.name, "Inserted new character");
|
||||
}
|
||||
tracing::info!(id = character.rmid, name = %character.name, "Upserted character");
|
||||
}
|
||||
|
||||
// tracing::info!("Inserted {} characters", insert_result.inserted_ids.len());
|
||||
|
||||
let character_count = collection.count_documents(doc! {}).await?;
|
||||
let character_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM characters")
|
||||
.fetch_one(&pool)
|
||||
.await?;
|
||||
tracing::info!(
|
||||
count = character_count,
|
||||
"Total characters in DB after import"
|
||||
|
Reference in New Issue
Block a user