From 3a3f3b38896ad3ae746eb71c43f47ea9169499b4 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 11 Jun 2026 14:31:46 +0200 Subject: [PATCH] db: refresh_sessions migration + SQLite/Postgres adapters --- .../migrations/0027_refresh_sessions.sql | 10 ++ crates/adapters/postgres/src/lib.rs | 2 + .../adapters/postgres/src/refresh_sessions.rs | 114 ++++++++++++++++++ .../migrations/0027_refresh_sessions.sql | 10 ++ crates/adapters/sqlite/src/lib.rs | 2 + .../adapters/sqlite/src/refresh_sessions.rs | 113 +++++++++++++++++ crates/presentation/src/factory.rs | 4 +- crates/worker/src/db.rs | 4 +- 8 files changed, 255 insertions(+), 4 deletions(-) create mode 100644 crates/adapters/postgres/migrations/0027_refresh_sessions.sql create mode 100644 crates/adapters/postgres/src/refresh_sessions.rs create mode 100644 crates/adapters/sqlite/migrations/0027_refresh_sessions.sql create mode 100644 crates/adapters/sqlite/src/refresh_sessions.rs diff --git a/crates/adapters/postgres/migrations/0027_refresh_sessions.sql b/crates/adapters/postgres/migrations/0027_refresh_sessions.sql new file mode 100644 index 0000000..26e97a9 --- /dev/null +++ b/crates/adapters/postgres/migrations/0027_refresh_sessions.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS refresh_sessions ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + token TEXT NOT NULL UNIQUE, + expires_at TIMESTAMPTZ NOT NULL, + created_at TIMESTAMPTZ NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_refresh_sessions_token ON refresh_sessions(token); +CREATE INDEX IF NOT EXISTS idx_refresh_sessions_user_id ON refresh_sessions(user_id); +CREATE INDEX IF NOT EXISTS idx_refresh_sessions_expires_at ON refresh_sessions(expires_at); diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index c45996a..321f0cd 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -21,6 +21,7 @@ mod models; mod persons; mod profile; mod profile_fields; +mod refresh_sessions; mod remote_goals; mod user_settings; mod users; @@ -39,6 +40,7 @@ pub use import_profile::PostgresImportProfileRepository; pub use import_session::PostgresImportSessionRepository; pub use persons::{PostgresPersonAdapter, create_person_adapter}; pub use profile::PostgresMovieProfileRepository; +pub use refresh_sessions::PostgresRefreshSessionAdapter; pub use profile_fields::PostgresProfileFieldsRepository; pub use users::PostgresUserRepository; pub use watch_event::{PostgresWatchEventRepository, PostgresWebhookTokenRepository}; diff --git a/crates/adapters/postgres/src/refresh_sessions.rs b/crates/adapters/postgres/src/refresh_sessions.rs new file mode 100644 index 0000000..4f916fd --- /dev/null +++ b/crates/adapters/postgres/src/refresh_sessions.rs @@ -0,0 +1,114 @@ +use async_trait::async_trait; +use chrono::DateTime; +use domain::{ + errors::DomainError, + models::RefreshSession, + ports::RefreshSessionRepository, + value_objects::UserId, +}; +use sqlx::PgPool; + +pub struct PostgresRefreshSessionAdapter { + pool: PgPool, +} + +impl PostgresRefreshSessionAdapter { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +fn map_err(e: sqlx::Error) -> DomainError { + DomainError::InfrastructureError(e.to_string()) +} + +#[async_trait] +impl RefreshSessionRepository for PostgresRefreshSessionAdapter { + async fn create(&self, session: &RefreshSession) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO refresh_sessions (id, user_id, token, expires_at, created_at) + VALUES ($1, $2, $3, $4, $5)", + ) + .bind(session.id.to_string()) + .bind(session.user_id.value().to_string()) + .bind(&session.token) + .bind(session.expires_at) + .bind(session.created_at) + .execute(&self.pool) + .await + .map_err(map_err)?; + Ok(()) + } + + async fn get_by_token(&self, token: &str) -> Result, DomainError> { + let row = sqlx::query_as::<_, RefreshSessionRow>( + "SELECT id, user_id, token, + to_char(expires_at AT TIME ZONE 'UTC', 'YYYY-MM-DD\"T\"HH24:MI:SS\"Z\"') AS expires_at, + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD\"T\"HH24:MI:SS\"Z\"') AS created_at + FROM refresh_sessions WHERE token = $1", + ) + .bind(token) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + row.map(RefreshSessionRow::into_domain).transpose() + } + + async fn revoke(&self, token: &str) -> Result<(), DomainError> { + sqlx::query("DELETE FROM refresh_sessions WHERE token = $1") + .bind(token) + .execute(&self.pool) + .await + .map_err(map_err)?; + Ok(()) + } + + async fn revoke_all_for_user(&self, user_id: &UserId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM refresh_sessions WHERE user_id = $1") + .bind(user_id.value().to_string()) + .execute(&self.pool) + .await + .map_err(map_err)?; + Ok(()) + } + + async fn delete_expired(&self) -> Result { + let result = sqlx::query("DELETE FROM refresh_sessions WHERE expires_at < NOW()") + .execute(&self.pool) + .await + .map_err(map_err)?; + Ok(result.rows_affected()) + } +} + +#[derive(sqlx::FromRow)] +struct RefreshSessionRow { + id: String, + user_id: String, + token: String, + expires_at: String, + created_at: String, +} + +impl RefreshSessionRow { + fn into_domain(self) -> Result { + let id = uuid::Uuid::parse_str(&self.id) + .map_err(|e| DomainError::InfrastructureError(format!("invalid uuid: {e}")))?; + let user_id = uuid::Uuid::parse_str(&self.user_id) + .map_err(|e| DomainError::InfrastructureError(format!("invalid user_id: {e}")))?; + let expires_at = DateTime::parse_from_rfc3339(&self.expires_at) + .map_err(|e| DomainError::InfrastructureError(format!("invalid expires_at: {e}")))? + .with_timezone(&chrono::Utc); + let created_at = DateTime::parse_from_rfc3339(&self.created_at) + .map_err(|e| DomainError::InfrastructureError(format!("invalid created_at: {e}")))? + .with_timezone(&chrono::Utc); + Ok(RefreshSession { + id, + user_id: UserId::from_uuid(user_id), + token: self.token, + expires_at, + created_at, + }) + } +} diff --git a/crates/adapters/sqlite/migrations/0027_refresh_sessions.sql b/crates/adapters/sqlite/migrations/0027_refresh_sessions.sql new file mode 100644 index 0000000..df1c45e --- /dev/null +++ b/crates/adapters/sqlite/migrations/0027_refresh_sessions.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS refresh_sessions ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + token TEXT NOT NULL UNIQUE, + expires_at TEXT NOT NULL, + created_at TEXT NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_refresh_sessions_token ON refresh_sessions(token); +CREATE INDEX IF NOT EXISTS idx_refresh_sessions_user_id ON refresh_sessions(user_id); +CREATE INDEX IF NOT EXISTS idx_refresh_sessions_expires_at ON refresh_sessions(expires_at); diff --git a/crates/adapters/sqlite/src/lib.rs b/crates/adapters/sqlite/src/lib.rs index e342a37..89f97ab 100644 --- a/crates/adapters/sqlite/src/lib.rs +++ b/crates/adapters/sqlite/src/lib.rs @@ -22,6 +22,7 @@ mod models; mod persons; mod profile; mod profile_fields; +mod refresh_sessions; mod remote_goals; mod user_settings; mod users; @@ -40,6 +41,7 @@ pub use import_profile::SqliteImportProfileRepository; pub use import_session::SqliteImportSessionRepository; pub use persons::{SqlitePersonAdapter, create_person_adapter}; pub use profile::SqliteMovieProfileRepository; +pub use refresh_sessions::SqliteRefreshSessionAdapter; pub use profile_fields::SqliteProfileFieldsRepository; pub use users::SqliteUserRepository; pub use watch_event::{SqliteWatchEventRepository, SqliteWebhookTokenRepository}; diff --git a/crates/adapters/sqlite/src/refresh_sessions.rs b/crates/adapters/sqlite/src/refresh_sessions.rs new file mode 100644 index 0000000..dd6cd8d --- /dev/null +++ b/crates/adapters/sqlite/src/refresh_sessions.rs @@ -0,0 +1,113 @@ +use async_trait::async_trait; +use chrono::DateTime; +use domain::{ + errors::DomainError, + models::RefreshSession, + ports::RefreshSessionRepository, + value_objects::UserId, +}; +use sqlx::SqlitePool; + +pub struct SqliteRefreshSessionAdapter { + pool: SqlitePool, +} + +impl SqliteRefreshSessionAdapter { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +fn map_err(e: sqlx::Error) -> DomainError { + DomainError::InfrastructureError(e.to_string()) +} + +#[async_trait] +impl RefreshSessionRepository for SqliteRefreshSessionAdapter { + async fn create(&self, session: &RefreshSession) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO refresh_sessions (id, user_id, token, expires_at, created_at) + VALUES (?, ?, ?, ?, ?)", + ) + .bind(session.id.to_string()) + .bind(session.user_id.value().to_string()) + .bind(&session.token) + .bind(session.expires_at.to_rfc3339()) + .bind(session.created_at.to_rfc3339()) + .execute(&self.pool) + .await + .map_err(map_err)?; + Ok(()) + } + + async fn get_by_token(&self, token: &str) -> Result, DomainError> { + let row = sqlx::query_as::<_, RefreshSessionRow>( + "SELECT id, user_id, token, expires_at, created_at FROM refresh_sessions WHERE token = ?", + ) + .bind(token) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + row.map(RefreshSessionRow::into_domain).transpose() + } + + async fn revoke(&self, token: &str) -> Result<(), DomainError> { + sqlx::query("DELETE FROM refresh_sessions WHERE token = ?") + .bind(token) + .execute(&self.pool) + .await + .map_err(map_err)?; + Ok(()) + } + + async fn revoke_all_for_user(&self, user_id: &UserId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM refresh_sessions WHERE user_id = ?") + .bind(user_id.value().to_string()) + .execute(&self.pool) + .await + .map_err(map_err)?; + Ok(()) + } + + async fn delete_expired(&self) -> Result { + let now = chrono::Utc::now().to_rfc3339(); + let result = sqlx::query("DELETE FROM refresh_sessions WHERE expires_at < ?") + .bind(&now) + .execute(&self.pool) + .await + .map_err(map_err)?; + Ok(result.rows_affected()) + } +} + +#[derive(sqlx::FromRow)] +struct RefreshSessionRow { + id: String, + user_id: String, + token: String, + expires_at: String, + created_at: String, +} + +impl RefreshSessionRow { + fn into_domain(self) -> Result { + let id = uuid::Uuid::parse_str(&self.id) + .map_err(|e| DomainError::InfrastructureError(format!("invalid uuid: {e}")))?; + let user_id = uuid::Uuid::parse_str(&self.user_id) + .map_err(|e| DomainError::InfrastructureError(format!("invalid user_id: {e}")))?; + let expires_at = DateTime::parse_from_rfc3339(&self.expires_at) + .map_err(|e| DomainError::InfrastructureError(format!("invalid expires_at: {e}")))? + .with_timezone(&chrono::Utc); + let created_at = DateTime::parse_from_rfc3339(&self.created_at) + .map_err(|e| DomainError::InfrastructureError(format!("invalid created_at: {e}")))? + .with_timezone(&chrono::Utc); + Ok(RefreshSession { + id, + user_id: UserId::from_uuid(user_id), + token: self.token, + expires_at, + created_at, + }) + } +} diff --git a/crates/presentation/src/factory.rs b/crates/presentation/src/factory.rs index 96ab018..1757956 100644 --- a/crates/presentation/src/factory.rs +++ b/crates/presentation/src/factory.rs @@ -79,7 +79,7 @@ pub async fn build_database_adapters(backend: &str, url: &str) -> anyhow::Result goal: w.goal, user_settings: w.user_settings, remote_goal: w.remote_goal, - refresh_session: Arc::new(domain::testing::PanicRefreshSessionRepository) as _, + refresh_session: Arc::new(postgres::PostgresRefreshSessionAdapter::new(w.pool.clone())) as _, db_pool: DbPool::Postgres(w.pool), }) } @@ -118,7 +118,7 @@ pub async fn build_database_adapters(backend: &str, url: &str) -> anyhow::Result goal: w.goal, user_settings: w.user_settings, remote_goal: w.remote_goal, - refresh_session: Arc::new(domain::testing::PanicRefreshSessionRepository) as _, + refresh_session: Arc::new(sqlite::SqliteRefreshSessionAdapter::new(w.pool.clone())) as _, db_pool: DbPool::Sqlite(w.pool), }) } diff --git a/crates/worker/src/db.rs b/crates/worker/src/db.rs index 7953bc7..76efd91 100644 --- a/crates/worker/src/db.rs +++ b/crates/worker/src/db.rs @@ -87,7 +87,7 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result anyhow::Result