feat: initialize k-tv-frontend with Next.js and Tailwind CSS
- Added package.json with dependencies and scripts for development, build, and linting. - Created postcss.config.mjs for Tailwind CSS integration. - Added SVG assets for UI components including file, globe, next, vercel, and window icons. - Configured TypeScript with tsconfig.json for strict type checking and module resolution.
This commit is contained in:
48
k-tv-backend/infra/Cargo.toml
Normal file
48
k-tv-backend/infra/Cargo.toml
Normal file
@@ -0,0 +1,48 @@
|
||||
[package]
|
||||
name = "infra"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[features]
|
||||
default = ["sqlite", "jellyfin"]
|
||||
sqlite = ["sqlx/sqlite", "k-core/sqlite"]
|
||||
postgres = ["sqlx/postgres", "k-core/postgres"]
|
||||
broker-nats = ["dep:futures-util", "k-core/broker-nats"]
|
||||
auth-oidc = ["dep:openidconnect", "dep:url", "dep:axum-extra"]
|
||||
auth-jwt = ["dep:jsonwebtoken"]
|
||||
jellyfin = ["dep:reqwest"]
|
||||
|
||||
[dependencies]
|
||||
k-core = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-core", features = [
|
||||
"logging",
|
||||
"db-sqlx",
|
||||
] }
|
||||
domain = { path = "../domain" }
|
||||
|
||||
async-trait = "0.1.89"
|
||||
chrono = { version = "0.4.42", features = ["serde"] }
|
||||
sqlx = { version = "0.8.6", features = ["runtime-tokio", "chrono", "migrate"] }
|
||||
thiserror = "2.0.17"
|
||||
anyhow = "1.0"
|
||||
tokio = { version = "1.48.0", features = ["full"] }
|
||||
tracing = "0.1"
|
||||
uuid = { version = "1.19.0", features = ["v4", "serde"] }
|
||||
serde_json = "1.0"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
futures-core = "0.3"
|
||||
password-auth = "1.0"
|
||||
|
||||
# Optional dependencies
|
||||
async-nats = { version = "0.45", optional = true }
|
||||
futures-util = { version = "0.3", optional = true }
|
||||
openidconnect = { version = "4.0.1", optional = true }
|
||||
url = { version = "2.5.8", optional = true }
|
||||
axum-extra = { version = "0.10", features = ["cookie-private"], optional = true }
|
||||
reqwest = { version = "0.12", features = ["json"], optional = true }
|
||||
jsonwebtoken = { version = "10.2.0", features = [
|
||||
"sha2",
|
||||
"p256",
|
||||
"hmac",
|
||||
"rsa",
|
||||
"rust_crypto",
|
||||
], optional = true }
|
||||
40
k-tv-backend/infra/Cargo.toml.template
Normal file
40
k-tv-backend/infra/Cargo.toml.template
Normal file
@@ -0,0 +1,40 @@
|
||||
[package]
|
||||
name = "infra"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[features]
|
||||
default = ["sqlite", "auth-jwt"]
|
||||
sqlite = ["sqlx/sqlite", "k-core/sqlite"]
|
||||
postgres = ["sqlx/postgres", "k-core/postgres"]
|
||||
broker-nats = ["dep:futures-util", "k-core/broker-nats"]
|
||||
auth-oidc = ["dep:openidconnect", "dep:url", "dep:axum-extra"]
|
||||
auth-jwt = ["dep:jsonwebtoken"]
|
||||
|
||||
[dependencies]
|
||||
k-core = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-core", features = [
|
||||
"logging",
|
||||
"db-sqlx",
|
||||
] }
|
||||
domain = { path = "../domain" }
|
||||
|
||||
async-trait = "0.1.89"
|
||||
chrono = { version = "0.4.42", features = ["serde"] }
|
||||
sqlx = { version = "0.8.6", features = ["runtime-tokio", "chrono", "migrate"] }
|
||||
thiserror = "2.0.17"
|
||||
anyhow = "1.0"
|
||||
tokio = { version = "1.48.0", features = ["full"] }
|
||||
tracing = "0.1"
|
||||
uuid = { version = "1.19.0", features = ["v4", "serde"] }
|
||||
serde_json = "1.0"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
futures-core = "0.3"
|
||||
password-auth = "1.0"
|
||||
|
||||
# Optional dependencies
|
||||
async-nats = { version = "0.45", optional = true }
|
||||
futures-util = { version = "0.3", optional = true }
|
||||
openidconnect = { version = "4.0.1", optional = true }
|
||||
url = { version = "2.5.8", optional = true }
|
||||
axum-extra = { version = "0.10", features = ["cookie-private"], optional = true }
|
||||
jsonwebtoken = { version = "9.3", optional = true }
|
||||
278
k-tv-backend/infra/src/auth/jwt.rs
Normal file
278
k-tv-backend/infra/src/auth/jwt.rs
Normal file
@@ -0,0 +1,278 @@
|
||||
//! JWT Authentication Infrastructure
|
||||
//!
|
||||
//! Provides JWT token creation and validation using HS256 (secret-based).
|
||||
//! For OIDC/JWKS validation, see the `oidc` module.
|
||||
|
||||
use domain::User;
|
||||
use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, Validation, decode, encode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
/// Minimum secret length for production (256 bits = 32 bytes)
|
||||
const MIN_SECRET_LENGTH: usize = 32;
|
||||
|
||||
/// JWT configuration
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct JwtConfig {
|
||||
/// Secret key for HS256 signing/verification
|
||||
pub secret: String,
|
||||
/// Expected issuer (for validation)
|
||||
pub issuer: Option<String>,
|
||||
/// Expected audience (for validation)
|
||||
pub audience: Option<String>,
|
||||
/// Token expiry in hours (default: 24)
|
||||
pub expiry_hours: u64,
|
||||
}
|
||||
|
||||
impl JwtConfig {
|
||||
/// Create a new JWT config with validation
|
||||
///
|
||||
/// In production mode, this will reject weak secrets.
|
||||
pub fn new(
|
||||
secret: String,
|
||||
issuer: Option<String>,
|
||||
audience: Option<String>,
|
||||
expiry_hours: Option<u64>,
|
||||
is_production: bool,
|
||||
) -> Result<Self, JwtError> {
|
||||
// Validate secret strength in production
|
||||
if is_production && secret.len() < MIN_SECRET_LENGTH {
|
||||
return Err(JwtError::WeakSecret {
|
||||
min_length: MIN_SECRET_LENGTH,
|
||||
actual_length: secret.len(),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
secret,
|
||||
issuer,
|
||||
audience,
|
||||
expiry_hours: expiry_hours.unwrap_or(24),
|
||||
})
|
||||
}
|
||||
|
||||
/// Create config without validation (for testing)
|
||||
pub fn new_unchecked(secret: String) -> Self {
|
||||
Self {
|
||||
secret,
|
||||
issuer: None,
|
||||
audience: None,
|
||||
expiry_hours: 24,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// JWT claims structure
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct JwtClaims {
|
||||
/// Subject - the user's unique identifier (user ID as string)
|
||||
pub sub: String,
|
||||
/// User's email address
|
||||
pub email: String,
|
||||
/// Expiry timestamp (seconds since UNIX epoch)
|
||||
pub exp: usize,
|
||||
/// Issued at timestamp (seconds since UNIX epoch)
|
||||
pub iat: usize,
|
||||
/// Issuer
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub iss: Option<String>,
|
||||
/// Audience
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub aud: Option<String>,
|
||||
}
|
||||
|
||||
/// JWT-related errors
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum JwtError {
|
||||
#[error("JWT secret is too weak: minimum {min_length} bytes required, got {actual_length}")]
|
||||
WeakSecret {
|
||||
min_length: usize,
|
||||
actual_length: usize,
|
||||
},
|
||||
|
||||
#[error("Token creation failed: {0}")]
|
||||
CreationFailed(#[from] jsonwebtoken::errors::Error),
|
||||
|
||||
#[error("Token validation failed: {0}")]
|
||||
ValidationFailed(String),
|
||||
|
||||
#[error("Token expired")]
|
||||
Expired,
|
||||
|
||||
#[error("Invalid token format")]
|
||||
InvalidFormat,
|
||||
|
||||
#[error("Missing configuration")]
|
||||
MissingConfig,
|
||||
}
|
||||
|
||||
/// JWT token validator and generator
|
||||
#[derive(Clone)]
|
||||
pub struct JwtValidator {
|
||||
config: JwtConfig,
|
||||
encoding_key: EncodingKey,
|
||||
decoding_key: DecodingKey,
|
||||
validation: Validation,
|
||||
}
|
||||
|
||||
impl JwtValidator {
|
||||
/// Create a new JWT validator with the given configuration
|
||||
pub fn new(config: JwtConfig) -> Self {
|
||||
let encoding_key = EncodingKey::from_secret(config.secret.as_bytes());
|
||||
let decoding_key = DecodingKey::from_secret(config.secret.as_bytes());
|
||||
|
||||
let mut validation = Validation::new(Algorithm::HS256);
|
||||
|
||||
// Configure issuer validation if set
|
||||
if let Some(ref issuer) = config.issuer {
|
||||
validation.set_issuer(&[issuer]);
|
||||
}
|
||||
|
||||
// Configure audience validation if set
|
||||
if let Some(ref audience) = config.audience {
|
||||
validation.set_audience(&[audience]);
|
||||
}
|
||||
|
||||
Self {
|
||||
config,
|
||||
encoding_key,
|
||||
decoding_key,
|
||||
validation,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a JWT token for the given user
|
||||
pub fn create_token(&self, user: &User) -> Result<String, JwtError> {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_secs() as usize;
|
||||
|
||||
let expiry = now + (self.config.expiry_hours as usize * 3600);
|
||||
|
||||
let claims = JwtClaims {
|
||||
sub: user.id.to_string(),
|
||||
email: user.email.as_ref().to_string(),
|
||||
exp: expiry,
|
||||
iat: now,
|
||||
iss: self.config.issuer.clone(),
|
||||
aud: self.config.audience.clone(),
|
||||
};
|
||||
|
||||
let header = Header::new(Algorithm::HS256);
|
||||
encode(&header, &claims, &self.encoding_key).map_err(JwtError::CreationFailed)
|
||||
}
|
||||
|
||||
/// Validate a JWT token and return the claims
|
||||
pub fn validate_token(&self, token: &str) -> Result<JwtClaims, JwtError> {
|
||||
let token_data = decode::<JwtClaims>(token, &self.decoding_key, &self.validation).map_err(
|
||||
|e| match e.kind() {
|
||||
jsonwebtoken::errors::ErrorKind::ExpiredSignature => JwtError::Expired,
|
||||
jsonwebtoken::errors::ErrorKind::InvalidToken => JwtError::InvalidFormat,
|
||||
_ => JwtError::ValidationFailed(e.to_string()),
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(token_data.claims)
|
||||
}
|
||||
|
||||
/// Get the user ID (subject) from a token without full validation
|
||||
/// Useful for logging/debugging, but should not be trusted for auth
|
||||
pub fn decode_unverified(&self, token: &str) -> Result<JwtClaims, JwtError> {
|
||||
let mut validation = Validation::new(Algorithm::HS256);
|
||||
validation.insecure_disable_signature_validation();
|
||||
validation.validate_exp = false;
|
||||
|
||||
let token_data = decode::<JwtClaims>(token, &self.decoding_key, &validation)
|
||||
.map_err(|_| JwtError::InvalidFormat)?;
|
||||
|
||||
Ok(token_data.claims)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for JwtValidator {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("JwtValidator")
|
||||
.field("issuer", &self.config.issuer)
|
||||
.field("audience", &self.config.audience)
|
||||
.field("expiry_hours", &self.config.expiry_hours)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use domain::Email;
|
||||
|
||||
fn create_test_user() -> User {
|
||||
let email = Email::try_from("test@example.com").unwrap();
|
||||
User::new("test-subject", email)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_and_validate_token() {
|
||||
let config = JwtConfig::new_unchecked("test-secret-key-that-is-long-enough".to_string());
|
||||
let validator = JwtValidator::new(config);
|
||||
let user = create_test_user();
|
||||
|
||||
let token = validator.create_token(&user).expect("Should create token");
|
||||
let claims = validator
|
||||
.validate_token(&token)
|
||||
.expect("Should validate token");
|
||||
|
||||
assert_eq!(claims.sub, user.id.to_string());
|
||||
assert_eq!(claims.email, "test@example.com");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_weak_secret_rejected_in_production() {
|
||||
let result = JwtConfig::new(
|
||||
"short".to_string(), // Too short
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
true, // Production mode
|
||||
);
|
||||
|
||||
assert!(matches!(result, Err(JwtError::WeakSecret { .. })));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_weak_secret_allowed_in_development() {
|
||||
let result = JwtConfig::new(
|
||||
"short".to_string(), // Too short but OK in dev
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false, // Development mode
|
||||
);
|
||||
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_invalid_token_rejected() {
|
||||
let config = JwtConfig::new_unchecked("test-secret-key-that-is-long-enough".to_string());
|
||||
let validator = JwtValidator::new(config);
|
||||
|
||||
let result = validator.validate_token("invalid.token.here");
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wrong_secret_rejected() {
|
||||
let config1 = JwtConfig::new_unchecked("secret-one-that-is-long-enough".to_string());
|
||||
let config2 = JwtConfig::new_unchecked("secret-two-that-is-long-enough".to_string());
|
||||
|
||||
let validator1 = JwtValidator::new(config1);
|
||||
let validator2 = JwtValidator::new(config2);
|
||||
|
||||
let user = create_test_user();
|
||||
let token = validator1.create_token(&user).unwrap();
|
||||
|
||||
// Token from validator1 should fail on validator2
|
||||
let result = validator2.validate_token(&token);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
19
k-tv-backend/infra/src/auth/mod.rs
Normal file
19
k-tv-backend/infra/src/auth/mod.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
//! Authentication infrastructure
|
||||
//!
|
||||
//! This module contains the concrete implementation of authentication mechanisms.
|
||||
|
||||
/// Hash a password using the password-auth crate
|
||||
pub fn hash_password(password: &str) -> String {
|
||||
password_auth::generate_hash(password)
|
||||
}
|
||||
|
||||
/// Verify a password against a stored hash
|
||||
pub fn verify_password(password: &str, hash: &str) -> bool {
|
||||
password_auth::verify_password(password, hash).is_ok()
|
||||
}
|
||||
|
||||
#[cfg(feature = "auth-oidc")]
|
||||
pub mod oidc;
|
||||
|
||||
#[cfg(feature = "auth-jwt")]
|
||||
pub mod jwt;
|
||||
212
k-tv-backend/infra/src/auth/oidc.rs
Normal file
212
k-tv-backend/infra/src/auth/oidc.rs
Normal file
@@ -0,0 +1,212 @@
|
||||
use anyhow::anyhow;
|
||||
use domain::{
|
||||
AuthorizationCode, AuthorizationUrlData, ClientId, ClientSecret, CsrfToken, IssuerUrl,
|
||||
OidcNonce, PkceVerifier, RedirectUrl, ResourceId,
|
||||
};
|
||||
use openidconnect::{
|
||||
AccessTokenHash, Client, EmptyAdditionalClaims, EndpointMaybeSet, EndpointNotSet, EndpointSet,
|
||||
OAuth2TokenResponse, PkceCodeChallenge, Scope, StandardErrorResponse, TokenResponse,
|
||||
UserInfoClaims,
|
||||
core::{
|
||||
CoreAuthDisplay, CoreAuthPrompt, CoreAuthenticationFlow, CoreClient, CoreErrorResponseType,
|
||||
CoreGenderClaim, CoreJsonWebKey, CoreJweContentEncryptionAlgorithm, CoreProviderMetadata,
|
||||
CoreRevocableToken, CoreRevocationErrorResponse, CoreTokenIntrospectionResponse,
|
||||
CoreTokenResponse,
|
||||
},
|
||||
reqwest,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub type OidcClient = Client<
|
||||
EmptyAdditionalClaims,
|
||||
CoreAuthDisplay,
|
||||
CoreGenderClaim,
|
||||
CoreJweContentEncryptionAlgorithm,
|
||||
CoreJsonWebKey,
|
||||
CoreAuthPrompt,
|
||||
StandardErrorResponse<CoreErrorResponseType>,
|
||||
CoreTokenResponse,
|
||||
CoreTokenIntrospectionResponse,
|
||||
CoreRevocableToken,
|
||||
CoreRevocationErrorResponse,
|
||||
EndpointSet, // HasAuthUrl (Required and guaranteed by discovery)
|
||||
EndpointNotSet, // HasDeviceAuthUrl
|
||||
EndpointNotSet, // HasIntrospectionUrl
|
||||
EndpointNotSet, // HasRevocationUrl
|
||||
EndpointMaybeSet, // HasTokenUrl (Discovered, might be missing)
|
||||
EndpointMaybeSet, // HasUserInfoUrl (Discovered, might be missing)
|
||||
>;
|
||||
|
||||
/// Serializable OIDC state stored in an encrypted cookie during the auth code flow
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct OidcState {
|
||||
pub csrf_token: CsrfToken,
|
||||
pub nonce: OidcNonce,
|
||||
pub pkce_verifier: PkceVerifier,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct OidcService {
|
||||
client: OidcClient,
|
||||
http_client: reqwest::Client,
|
||||
resource_id: Option<ResourceId>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct OidcUser {
|
||||
pub subject: String,
|
||||
pub email: String,
|
||||
}
|
||||
|
||||
impl OidcService {
|
||||
/// Create a new OIDC service with validated configuration newtypes
|
||||
pub async fn new(
|
||||
issuer: IssuerUrl,
|
||||
client_id: ClientId,
|
||||
client_secret: Option<ClientSecret>,
|
||||
redirect_url: RedirectUrl,
|
||||
resource_id: Option<ResourceId>,
|
||||
) -> anyhow::Result<Self> {
|
||||
tracing::debug!("🔵 OIDC Setup: Client ID = '{}'", client_id);
|
||||
tracing::debug!("🔵 OIDC Setup: Redirect = '{}'", redirect_url);
|
||||
tracing::debug!(
|
||||
"🔵 OIDC Setup: Secret = {:?}",
|
||||
if client_secret.is_some() { "SET" } else { "NONE" }
|
||||
);
|
||||
|
||||
let http_client = reqwest::ClientBuilder::new()
|
||||
.redirect(reqwest::redirect::Policy::none())
|
||||
.build()?;
|
||||
|
||||
let provider_metadata = CoreProviderMetadata::discover_async(
|
||||
openidconnect::IssuerUrl::new(issuer.as_ref().to_string())?,
|
||||
&http_client,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let oidc_client_id = openidconnect::ClientId::new(client_id.as_ref().to_string());
|
||||
let oidc_client_secret = client_secret
|
||||
.as_ref()
|
||||
.filter(|s| !s.is_empty())
|
||||
.map(|s| openidconnect::ClientSecret::new(s.as_ref().to_string()));
|
||||
let oidc_redirect_url =
|
||||
openidconnect::RedirectUrl::new(redirect_url.as_ref().to_string())?;
|
||||
|
||||
let client = CoreClient::from_provider_metadata(
|
||||
provider_metadata,
|
||||
oidc_client_id,
|
||||
oidc_client_secret,
|
||||
)
|
||||
.set_redirect_uri(oidc_redirect_url);
|
||||
|
||||
Ok(Self {
|
||||
client,
|
||||
http_client,
|
||||
resource_id,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the authorization URL and associated state for OIDC login.
|
||||
///
|
||||
/// Returns `(AuthorizationUrlData, OidcState)` where `OidcState` should be
|
||||
/// serialized and stored in an encrypted cookie for the duration of the flow.
|
||||
pub fn get_authorization_url(&self) -> (AuthorizationUrlData, OidcState) {
|
||||
let (pkce_challenge, pkce_verifier) = PkceCodeChallenge::new_random_sha256();
|
||||
|
||||
let (auth_url, csrf_token, nonce) = self
|
||||
.client
|
||||
.authorize_url(
|
||||
CoreAuthenticationFlow::AuthorizationCode,
|
||||
openidconnect::CsrfToken::new_random,
|
||||
openidconnect::Nonce::new_random,
|
||||
)
|
||||
.add_scope(Scope::new("profile".to_string()))
|
||||
.add_scope(Scope::new("email".to_string()))
|
||||
.set_pkce_challenge(pkce_challenge)
|
||||
.url();
|
||||
|
||||
let oidc_state = OidcState {
|
||||
csrf_token: CsrfToken::new(csrf_token.secret().to_string()),
|
||||
nonce: OidcNonce::new(nonce.secret().to_string()),
|
||||
pkce_verifier: PkceVerifier::new(pkce_verifier.secret().to_string()),
|
||||
};
|
||||
|
||||
let auth_data = AuthorizationUrlData {
|
||||
url: auth_url.into(),
|
||||
csrf_token: oidc_state.csrf_token.clone(),
|
||||
nonce: oidc_state.nonce.clone(),
|
||||
pkce_verifier: oidc_state.pkce_verifier.clone(),
|
||||
};
|
||||
|
||||
(auth_data, oidc_state)
|
||||
}
|
||||
|
||||
/// Resolve the OIDC callback with type-safe parameters
|
||||
pub async fn resolve_callback(
|
||||
&self,
|
||||
code: AuthorizationCode,
|
||||
nonce: OidcNonce,
|
||||
pkce_verifier: PkceVerifier,
|
||||
) -> anyhow::Result<OidcUser> {
|
||||
let oidc_pkce_verifier =
|
||||
openidconnect::PkceCodeVerifier::new(pkce_verifier.as_ref().to_string());
|
||||
let oidc_nonce = openidconnect::Nonce::new(nonce.as_ref().to_string());
|
||||
|
||||
let token_response = self
|
||||
.client
|
||||
.exchange_code(openidconnect::AuthorizationCode::new(
|
||||
code.as_ref().to_string(),
|
||||
))?
|
||||
.set_pkce_verifier(oidc_pkce_verifier)
|
||||
.request_async(&self.http_client)
|
||||
.await?;
|
||||
|
||||
let id_token = token_response
|
||||
.id_token()
|
||||
.ok_or_else(|| anyhow!("Server did not return an ID token"))?;
|
||||
|
||||
let mut id_token_verifier = self.client.id_token_verifier().clone();
|
||||
|
||||
if let Some(resource_id) = &self.resource_id {
|
||||
let trusted_resource_id = resource_id.as_ref().to_string();
|
||||
id_token_verifier = id_token_verifier
|
||||
.set_other_audience_verifier_fn(move |aud| aud.as_str() == trusted_resource_id);
|
||||
}
|
||||
|
||||
let claims = id_token.claims(&id_token_verifier, &oidc_nonce)?;
|
||||
|
||||
if let Some(expected_access_token_hash) = claims.access_token_hash() {
|
||||
let actual_access_token_hash = AccessTokenHash::from_token(
|
||||
token_response.access_token(),
|
||||
id_token.signing_alg()?,
|
||||
id_token.signing_key(&id_token_verifier)?,
|
||||
)?;
|
||||
|
||||
if actual_access_token_hash != *expected_access_token_hash {
|
||||
return Err(anyhow!("Invalid access token"));
|
||||
}
|
||||
}
|
||||
|
||||
let email = if let Some(email) = claims.email() {
|
||||
Some(email.as_str().to_string())
|
||||
} else {
|
||||
tracing::debug!("🔵 Email missing in ID Token, fetching UserInfo...");
|
||||
|
||||
let user_info: UserInfoClaims<EmptyAdditionalClaims, CoreGenderClaim> = self
|
||||
.client
|
||||
.user_info(token_response.access_token().clone(), None)?
|
||||
.request_async(&self.http_client)
|
||||
.await?;
|
||||
|
||||
user_info.email().map(|e| e.as_str().to_string())
|
||||
};
|
||||
|
||||
let email =
|
||||
email.ok_or_else(|| anyhow!("User has no verified email address in ZITADEL"))?;
|
||||
|
||||
Ok(OidcUser {
|
||||
subject: claims.subject().to_string(),
|
||||
email,
|
||||
})
|
||||
}
|
||||
}
|
||||
255
k-tv-backend/infra/src/channel_repository.rs
Normal file
255
k-tv-backend/infra/src/channel_repository.rs
Normal file
@@ -0,0 +1,255 @@
|
||||
//! SQLite and PostgreSQL adapters for ChannelRepository
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use sqlx::FromRow;
|
||||
use uuid::Uuid;
|
||||
|
||||
use domain::{
|
||||
Channel, ChannelId, ChannelRepository, DomainError, DomainResult, RecyclePolicy,
|
||||
ScheduleConfig, UserId,
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Row type + mapping (shared between SQLite and Postgres)
|
||||
// ============================================================================
|
||||
|
||||
#[derive(Debug, FromRow)]
|
||||
struct ChannelRow {
|
||||
id: String,
|
||||
owner_id: String,
|
||||
name: String,
|
||||
description: Option<String>,
|
||||
timezone: String,
|
||||
schedule_config: String,
|
||||
recycle_policy: String,
|
||||
created_at: String,
|
||||
updated_at: String,
|
||||
}
|
||||
|
||||
fn parse_dt(s: &str) -> Result<DateTime<Utc>, DomainError> {
|
||||
DateTime::parse_from_rfc3339(s)
|
||||
.map(|dt| dt.with_timezone(&Utc))
|
||||
.or_else(|_| {
|
||||
chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map(|dt| dt.and_utc())
|
||||
})
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid datetime '{}': {}", s, e)))
|
||||
}
|
||||
|
||||
impl TryFrom<ChannelRow> for Channel {
|
||||
type Error = DomainError;
|
||||
|
||||
fn try_from(row: ChannelRow) -> Result<Self, Self::Error> {
|
||||
let id: ChannelId = Uuid::parse_str(&row.id)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid channel UUID: {}", e)))?;
|
||||
let owner_id: UserId = Uuid::parse_str(&row.owner_id)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid owner UUID: {}", e)))?;
|
||||
let schedule_config: ScheduleConfig = serde_json::from_str(&row.schedule_config)
|
||||
.map_err(|e| {
|
||||
DomainError::RepositoryError(format!("Invalid schedule_config JSON: {}", e))
|
||||
})?;
|
||||
let recycle_policy: RecyclePolicy = serde_json::from_str(&row.recycle_policy)
|
||||
.map_err(|e| {
|
||||
DomainError::RepositoryError(format!("Invalid recycle_policy JSON: {}", e))
|
||||
})?;
|
||||
|
||||
Ok(Channel {
|
||||
id,
|
||||
owner_id,
|
||||
name: row.name,
|
||||
description: row.description,
|
||||
timezone: row.timezone,
|
||||
schedule_config,
|
||||
recycle_policy,
|
||||
created_at: parse_dt(&row.created_at)?,
|
||||
updated_at: parse_dt(&row.updated_at)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const SELECT_COLS: &str =
|
||||
"id, owner_id, name, description, timezone, schedule_config, recycle_policy, created_at, updated_at";
|
||||
|
||||
// ============================================================================
|
||||
// SQLite adapter
|
||||
// ============================================================================
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
pub struct SqliteChannelRepository {
|
||||
pool: sqlx::SqlitePool,
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
impl SqliteChannelRepository {
|
||||
pub fn new(pool: sqlx::SqlitePool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
#[async_trait]
|
||||
impl ChannelRepository for SqliteChannelRepository {
|
||||
async fn find_by_id(&self, id: ChannelId) -> DomainResult<Option<Channel>> {
|
||||
let sql = format!("SELECT {SELECT_COLS} FROM channels WHERE id = ?");
|
||||
let row: Option<ChannelRow> = sqlx::query_as(&sql)
|
||||
.bind(id.to_string())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
row.map(Channel::try_from).transpose()
|
||||
}
|
||||
|
||||
async fn find_by_owner(&self, owner_id: UserId) -> DomainResult<Vec<Channel>> {
|
||||
let sql = format!(
|
||||
"SELECT {SELECT_COLS} FROM channels WHERE owner_id = ? ORDER BY created_at ASC"
|
||||
);
|
||||
let rows: Vec<ChannelRow> = sqlx::query_as(&sql)
|
||||
.bind(owner_id.to_string())
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
rows.into_iter().map(Channel::try_from).collect()
|
||||
}
|
||||
|
||||
async fn save(&self, channel: &Channel) -> DomainResult<()> {
|
||||
let schedule_config = serde_json::to_string(&channel.schedule_config).map_err(|e| {
|
||||
DomainError::RepositoryError(format!("Failed to serialize schedule_config: {}", e))
|
||||
})?;
|
||||
let recycle_policy = serde_json::to_string(&channel.recycle_policy).map_err(|e| {
|
||||
DomainError::RepositoryError(format!("Failed to serialize recycle_policy: {}", e))
|
||||
})?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO channels
|
||||
(id, owner_id, name, description, timezone, schedule_config, recycle_policy, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
name = excluded.name,
|
||||
description = excluded.description,
|
||||
timezone = excluded.timezone,
|
||||
schedule_config = excluded.schedule_config,
|
||||
recycle_policy = excluded.recycle_policy,
|
||||
updated_at = excluded.updated_at
|
||||
"#,
|
||||
)
|
||||
.bind(channel.id.to_string())
|
||||
.bind(channel.owner_id.to_string())
|
||||
.bind(&channel.name)
|
||||
.bind(&channel.description)
|
||||
.bind(&channel.timezone)
|
||||
.bind(&schedule_config)
|
||||
.bind(&recycle_policy)
|
||||
.bind(channel.created_at.to_rfc3339())
|
||||
.bind(channel.updated_at.to_rfc3339())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, id: ChannelId) -> DomainResult<()> {
|
||||
sqlx::query("DELETE FROM channels WHERE id = ?")
|
||||
.bind(id.to_string())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// PostgreSQL adapter
|
||||
// ============================================================================
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
pub struct PostgresChannelRepository {
|
||||
pool: sqlx::Pool<sqlx::Postgres>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
impl PostgresChannelRepository {
|
||||
pub fn new(pool: sqlx::Pool<sqlx::Postgres>) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
#[async_trait]
|
||||
impl ChannelRepository for PostgresChannelRepository {
|
||||
async fn find_by_id(&self, id: ChannelId) -> DomainResult<Option<Channel>> {
|
||||
let sql = format!("SELECT {SELECT_COLS} FROM channels WHERE id = $1");
|
||||
let row: Option<ChannelRow> = sqlx::query_as(&sql)
|
||||
.bind(id.to_string())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
row.map(Channel::try_from).transpose()
|
||||
}
|
||||
|
||||
async fn find_by_owner(&self, owner_id: UserId) -> DomainResult<Vec<Channel>> {
|
||||
let sql = format!(
|
||||
"SELECT {SELECT_COLS} FROM channels WHERE owner_id = $1 ORDER BY created_at ASC"
|
||||
);
|
||||
let rows: Vec<ChannelRow> = sqlx::query_as(&sql)
|
||||
.bind(owner_id.to_string())
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
rows.into_iter().map(Channel::try_from).collect()
|
||||
}
|
||||
|
||||
async fn save(&self, channel: &Channel) -> DomainResult<()> {
|
||||
let schedule_config = serde_json::to_string(&channel.schedule_config).map_err(|e| {
|
||||
DomainError::RepositoryError(format!("Failed to serialize schedule_config: {}", e))
|
||||
})?;
|
||||
let recycle_policy = serde_json::to_string(&channel.recycle_policy).map_err(|e| {
|
||||
DomainError::RepositoryError(format!("Failed to serialize recycle_policy: {}", e))
|
||||
})?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO channels
|
||||
(id, owner_id, name, description, timezone, schedule_config, recycle_policy, created_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
name = EXCLUDED.name,
|
||||
description = EXCLUDED.description,
|
||||
timezone = EXCLUDED.timezone,
|
||||
schedule_config = EXCLUDED.schedule_config,
|
||||
recycle_policy = EXCLUDED.recycle_policy,
|
||||
updated_at = EXCLUDED.updated_at
|
||||
"#,
|
||||
)
|
||||
.bind(channel.id.to_string())
|
||||
.bind(channel.owner_id.to_string())
|
||||
.bind(&channel.name)
|
||||
.bind(&channel.description)
|
||||
.bind(&channel.timezone)
|
||||
.bind(&schedule_config)
|
||||
.bind(&recycle_policy)
|
||||
.bind(channel.created_at.to_rfc3339())
|
||||
.bind(channel.updated_at.to_rfc3339())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, id: ChannelId) -> DomainResult<()> {
|
||||
sqlx::query("DELETE FROM channels WHERE id = $1")
|
||||
.bind(id.to_string())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
17
k-tv-backend/infra/src/db.rs
Normal file
17
k-tv-backend/infra/src/db.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
pub use k_core::db::DatabasePool;
|
||||
|
||||
pub async fn run_migrations(pool: &DatabasePool) -> Result<(), sqlx::Error> {
|
||||
match pool {
|
||||
#[cfg(feature = "sqlite")]
|
||||
DatabasePool::Sqlite(pool) => {
|
||||
// Point specifically to the sqlite folder
|
||||
sqlx::migrate!("../migrations_sqlite").run(pool).await?;
|
||||
}
|
||||
#[cfg(feature = "postgres")]
|
||||
DatabasePool::Postgres(pool) => {
|
||||
// Point specifically to the postgres folder
|
||||
sqlx::migrate!("../migrations_postgres").run(pool).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
71
k-tv-backend/infra/src/factory.rs
Normal file
71
k-tv-backend/infra/src/factory.rs
Normal file
@@ -0,0 +1,71 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::db::DatabasePool;
|
||||
use domain::{ChannelRepository, ScheduleRepository, UserRepository};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum FactoryError {
|
||||
#[error("Database error: {0}")]
|
||||
Database(#[from] sqlx::Error),
|
||||
#[error("Not implemented: {0}")]
|
||||
NotImplemented(String),
|
||||
#[error("Infrastructure error: {0}")]
|
||||
Infrastructure(#[from] domain::DomainError),
|
||||
}
|
||||
|
||||
pub type FactoryResult<T> = Result<T, FactoryError>;
|
||||
|
||||
pub async fn build_user_repository(pool: &DatabasePool) -> FactoryResult<Arc<dyn UserRepository>> {
|
||||
match pool {
|
||||
#[cfg(feature = "sqlite")]
|
||||
DatabasePool::Sqlite(pool) => Ok(Arc::new(
|
||||
crate::user_repository::SqliteUserRepository::new(pool.clone()),
|
||||
)),
|
||||
#[cfg(feature = "postgres")]
|
||||
DatabasePool::Postgres(pool) => Ok(Arc::new(
|
||||
crate::user_repository::PostgresUserRepository::new(pool.clone()),
|
||||
)),
|
||||
#[allow(unreachable_patterns)]
|
||||
_ => Err(FactoryError::NotImplemented(
|
||||
"No database feature enabled".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn build_channel_repository(
|
||||
pool: &DatabasePool,
|
||||
) -> FactoryResult<Arc<dyn ChannelRepository>> {
|
||||
match pool {
|
||||
#[cfg(feature = "sqlite")]
|
||||
DatabasePool::Sqlite(pool) => Ok(Arc::new(
|
||||
crate::channel_repository::SqliteChannelRepository::new(pool.clone()),
|
||||
)),
|
||||
#[cfg(feature = "postgres")]
|
||||
DatabasePool::Postgres(pool) => Ok(Arc::new(
|
||||
crate::channel_repository::PostgresChannelRepository::new(pool.clone()),
|
||||
)),
|
||||
#[allow(unreachable_patterns)]
|
||||
_ => Err(FactoryError::NotImplemented(
|
||||
"No database feature enabled".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn build_schedule_repository(
|
||||
pool: &DatabasePool,
|
||||
) -> FactoryResult<Arc<dyn ScheduleRepository>> {
|
||||
match pool {
|
||||
#[cfg(feature = "sqlite")]
|
||||
DatabasePool::Sqlite(pool) => Ok(Arc::new(
|
||||
crate::schedule_repository::SqliteScheduleRepository::new(pool.clone()),
|
||||
)),
|
||||
#[cfg(feature = "postgres")]
|
||||
DatabasePool::Postgres(pool) => Ok(Arc::new(
|
||||
crate::schedule_repository::PostgresScheduleRepository::new(pool.clone()),
|
||||
)),
|
||||
#[allow(unreachable_patterns)]
|
||||
_ => Err(FactoryError::NotImplemented(
|
||||
"No database feature enabled".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
234
k-tv-backend/infra/src/jellyfin.rs
Normal file
234
k-tv-backend/infra/src/jellyfin.rs
Normal file
@@ -0,0 +1,234 @@
|
||||
//! Jellyfin media provider adapter
|
||||
//!
|
||||
//! Implements [`IMediaProvider`] by talking to the Jellyfin HTTP API.
|
||||
//! The domain never sees Jellyfin-specific types — this module translates
|
||||
//! between Jellyfin's API model and the domain's abstract `MediaItem`/`MediaFilter`.
|
||||
|
||||
#![cfg(feature = "jellyfin")]
|
||||
|
||||
use async_trait::async_trait;
|
||||
use serde::Deserialize;
|
||||
|
||||
use domain::{ContentType, DomainError, DomainResult, IMediaProvider, MediaFilter, MediaItem, MediaItemId};
|
||||
|
||||
/// Ticks are Jellyfin's time unit: 1 tick = 100 nanoseconds → 10,000,000 ticks/sec.
|
||||
const TICKS_PER_SEC: i64 = 10_000_000;
|
||||
|
||||
// ============================================================================
|
||||
// Configuration
|
||||
// ============================================================================
|
||||
|
||||
/// Connection details for a single Jellyfin instance.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct JellyfinConfig {
|
||||
/// e.g. `"http://192.168.1.10:8096"` — no trailing slash
|
||||
pub base_url: String,
|
||||
/// Jellyfin API key (Settings → API Keys)
|
||||
pub api_key: String,
|
||||
/// The Jellyfin user ID used for library browsing
|
||||
pub user_id: String,
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Adapter
|
||||
// ============================================================================
|
||||
|
||||
pub struct JellyfinMediaProvider {
|
||||
client: reqwest::Client,
|
||||
config: JellyfinConfig,
|
||||
}
|
||||
|
||||
impl JellyfinMediaProvider {
|
||||
pub fn new(config: JellyfinConfig) -> Self {
|
||||
Self {
|
||||
client: reqwest::Client::new(),
|
||||
config,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl IMediaProvider for JellyfinMediaProvider {
|
||||
/// Fetch items matching `filter` from the Jellyfin library.
|
||||
///
|
||||
/// `MediaFilter.collections` maps to Jellyfin `ParentId` (library/folder UUID).
|
||||
/// Multiple collections are not supported in a single call; the first entry wins.
|
||||
/// Decades are mapped to Jellyfin's `MinYear`/`MaxYear`.
|
||||
async fn fetch_items(&self, filter: &MediaFilter) -> DomainResult<Vec<MediaItem>> {
|
||||
let url = format!(
|
||||
"{}/Users/{}/Items",
|
||||
self.config.base_url, self.config.user_id
|
||||
);
|
||||
|
||||
let mut params: Vec<(&str, String)> = vec![
|
||||
("Recursive", "true".into()),
|
||||
("Fields", "Genres,Tags,RunTimeTicks,ProductionYear".into()),
|
||||
];
|
||||
|
||||
if let Some(ct) = &filter.content_type {
|
||||
params.push(("IncludeItemTypes", jellyfin_item_type(ct).into()));
|
||||
}
|
||||
|
||||
if !filter.genres.is_empty() {
|
||||
params.push(("Genres", filter.genres.join("|")));
|
||||
}
|
||||
|
||||
if let Some(decade) = filter.decade {
|
||||
params.push(("MinYear", decade.to_string()));
|
||||
params.push(("MaxYear", (decade + 9).to_string()));
|
||||
}
|
||||
|
||||
if !filter.tags.is_empty() {
|
||||
params.push(("Tags", filter.tags.join("|")));
|
||||
}
|
||||
|
||||
if let Some(min) = filter.min_duration_secs {
|
||||
params.push(("MinRunTimeTicks", (min as i64 * TICKS_PER_SEC).to_string()));
|
||||
}
|
||||
if let Some(max) = filter.max_duration_secs {
|
||||
params.push(("MaxRunTimeTicks", (max as i64 * TICKS_PER_SEC).to_string()));
|
||||
}
|
||||
|
||||
// Treat the first collection entry as a Jellyfin ParentId (library/folder)
|
||||
if let Some(parent_id) = filter.collections.first() {
|
||||
params.push(("ParentId", parent_id.clone()));
|
||||
}
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.get(&url)
|
||||
.header("X-Emby-Token", &self.config.api_key)
|
||||
.query(¶ms)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
DomainError::InfrastructureError(format!("Jellyfin request failed: {}", e))
|
||||
})?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
return Err(DomainError::InfrastructureError(format!(
|
||||
"Jellyfin returned HTTP {}",
|
||||
response.status()
|
||||
)));
|
||||
}
|
||||
|
||||
let body: JellyfinItemsResponse = response.json().await.map_err(|e| {
|
||||
DomainError::InfrastructureError(format!("Failed to parse Jellyfin response: {}", e))
|
||||
})?;
|
||||
|
||||
Ok(body.items.into_iter().filter_map(map_jellyfin_item).collect())
|
||||
}
|
||||
|
||||
/// Fetch a single item by its opaque ID.
|
||||
///
|
||||
/// Returns `None` if the item is not found or cannot be mapped.
|
||||
async fn fetch_by_id(&self, item_id: &MediaItemId) -> DomainResult<Option<MediaItem>> {
|
||||
let url = format!(
|
||||
"{}/Users/{}/Items",
|
||||
self.config.base_url, self.config.user_id
|
||||
);
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.get(&url)
|
||||
.header("X-Emby-Token", &self.config.api_key)
|
||||
.query(&[
|
||||
("Ids", item_id.as_ref()),
|
||||
("Fields", "Genres,Tags,RunTimeTicks,ProductionYear"),
|
||||
])
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
DomainError::InfrastructureError(format!("Jellyfin request failed: {}", e))
|
||||
})?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let body: JellyfinItemsResponse = response.json().await.map_err(|e| {
|
||||
DomainError::InfrastructureError(format!("Failed to parse Jellyfin response: {}", e))
|
||||
})?;
|
||||
|
||||
Ok(body.items.into_iter().next().and_then(map_jellyfin_item))
|
||||
}
|
||||
|
||||
/// Build a direct-play stream URL for a Jellyfin item.
|
||||
///
|
||||
/// Uses `static=true` to request the original file without transcoding.
|
||||
/// The API key is embedded in the URL so the player does not need separate auth.
|
||||
async fn get_stream_url(&self, item_id: &MediaItemId) -> DomainResult<String> {
|
||||
Ok(format!(
|
||||
"{}/Videos/{}/stream?static=true&api_key={}",
|
||||
self.config.base_url,
|
||||
item_id.as_ref(),
|
||||
self.config.api_key,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Jellyfin API response types
|
||||
// ============================================================================
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct JellyfinItemsResponse {
|
||||
#[serde(rename = "Items")]
|
||||
items: Vec<JellyfinItem>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct JellyfinItem {
|
||||
#[serde(rename = "Id")]
|
||||
id: String,
|
||||
#[serde(rename = "Name")]
|
||||
name: String,
|
||||
#[serde(rename = "Type")]
|
||||
item_type: String,
|
||||
#[serde(rename = "RunTimeTicks")]
|
||||
run_time_ticks: Option<i64>,
|
||||
#[serde(rename = "Genres")]
|
||||
genres: Option<Vec<String>>,
|
||||
#[serde(rename = "ProductionYear")]
|
||||
production_year: Option<u16>,
|
||||
#[serde(rename = "Tags")]
|
||||
tags: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Mapping helpers
|
||||
// ============================================================================
|
||||
|
||||
fn jellyfin_item_type(ct: &ContentType) -> &'static str {
|
||||
match ct {
|
||||
ContentType::Movie => "Movie",
|
||||
ContentType::Episode => "Episode",
|
||||
// Jellyfin has no native "Short" type; short films are filed as Movies
|
||||
ContentType::Short => "Movie",
|
||||
}
|
||||
}
|
||||
|
||||
/// Map a raw Jellyfin item to a domain `MediaItem`. Returns `None` for unknown
|
||||
/// item types (e.g. Season, Series, Folder) so they are silently skipped.
|
||||
fn map_jellyfin_item(item: JellyfinItem) -> Option<MediaItem> {
|
||||
let content_type = match item.item_type.as_str() {
|
||||
"Movie" => ContentType::Movie,
|
||||
"Episode" => ContentType::Episode,
|
||||
_ => return None,
|
||||
};
|
||||
|
||||
let duration_secs = item
|
||||
.run_time_ticks
|
||||
.map(|t| (t / TICKS_PER_SEC) as u32)
|
||||
.unwrap_or(0);
|
||||
|
||||
Some(MediaItem {
|
||||
id: MediaItemId::new(item.id),
|
||||
title: item.name,
|
||||
content_type,
|
||||
duration_secs,
|
||||
genres: item.genres.unwrap_or_default(),
|
||||
year: item.production_year,
|
||||
tags: item.tags.unwrap_or_default(),
|
||||
})
|
||||
}
|
||||
35
k-tv-backend/infra/src/lib.rs
Normal file
35
k-tv-backend/infra/src/lib.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
//! K-TV Infrastructure Layer
|
||||
//!
|
||||
//! Concrete adapters for the repository and provider ports defined in the domain.
|
||||
//!
|
||||
//! ## Repository adapters
|
||||
//! - `SqliteUserRepository` / `PostgresUserRepository`
|
||||
//! - `SqliteChannelRepository` / `PostgresChannelRepository`
|
||||
//! - `SqliteScheduleRepository` / `PostgresScheduleRepository`
|
||||
//!
|
||||
//! ## Media provider adapters
|
||||
//! - `JellyfinMediaProvider` (feature = `"jellyfin"`)
|
||||
//!
|
||||
//! ## Database
|
||||
//! - [`db::run_migrations`] — run all pending SQLite/Postgres migrations
|
||||
|
||||
pub mod auth;
|
||||
pub mod db;
|
||||
pub mod factory;
|
||||
pub mod jellyfin;
|
||||
mod channel_repository;
|
||||
mod schedule_repository;
|
||||
mod user_repository;
|
||||
|
||||
// Re-export for convenience
|
||||
pub use db::run_migrations;
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
pub use user_repository::SqliteUserRepository;
|
||||
#[cfg(feature = "sqlite")]
|
||||
pub use channel_repository::SqliteChannelRepository;
|
||||
#[cfg(feature = "sqlite")]
|
||||
pub use schedule_repository::SqliteScheduleRepository;
|
||||
|
||||
#[cfg(feature = "jellyfin")]
|
||||
pub use jellyfin::{JellyfinConfig, JellyfinMediaProvider};
|
||||
447
k-tv-backend/infra/src/schedule_repository.rs
Normal file
447
k-tv-backend/infra/src/schedule_repository.rs
Normal file
@@ -0,0 +1,447 @@
|
||||
//! SQLite and PostgreSQL adapters for ScheduleRepository
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use sqlx::FromRow;
|
||||
use uuid::Uuid;
|
||||
|
||||
use domain::{
|
||||
ChannelId, DomainError, DomainResult, GeneratedSchedule, MediaItem, MediaItemId,
|
||||
PlaybackRecord, ScheduleRepository, ScheduledSlot,
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Row types
|
||||
// ============================================================================
|
||||
|
||||
#[derive(Debug, FromRow)]
|
||||
struct ScheduleRow {
|
||||
id: String,
|
||||
channel_id: String,
|
||||
valid_from: String,
|
||||
valid_until: String,
|
||||
generation: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, FromRow)]
|
||||
struct SlotRow {
|
||||
id: String,
|
||||
// schedule_id selected but only used to drive the JOIN; not needed for domain type
|
||||
#[allow(dead_code)]
|
||||
schedule_id: String,
|
||||
start_at: String,
|
||||
end_at: String,
|
||||
item: String,
|
||||
source_block_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, FromRow)]
|
||||
struct PlaybackRecordRow {
|
||||
id: String,
|
||||
channel_id: String,
|
||||
item_id: String,
|
||||
played_at: String,
|
||||
generation: i64,
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Mapping
|
||||
// ============================================================================
|
||||
|
||||
fn parse_dt(s: &str) -> Result<DateTime<Utc>, DomainError> {
|
||||
DateTime::parse_from_rfc3339(s)
|
||||
.map(|dt| dt.with_timezone(&Utc))
|
||||
.or_else(|_| {
|
||||
chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map(|dt| dt.and_utc())
|
||||
})
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid datetime '{}': {}", s, e)))
|
||||
}
|
||||
|
||||
fn map_slot_row(row: SlotRow) -> Result<ScheduledSlot, DomainError> {
|
||||
let id = Uuid::parse_str(&row.id)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid slot UUID: {}", e)))?;
|
||||
let source_block_id = Uuid::parse_str(&row.source_block_id)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid block UUID: {}", e)))?;
|
||||
let item: MediaItem = serde_json::from_str(&row.item)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid slot item JSON: {}", e)))?;
|
||||
|
||||
Ok(ScheduledSlot {
|
||||
id,
|
||||
start_at: parse_dt(&row.start_at)?,
|
||||
end_at: parse_dt(&row.end_at)?,
|
||||
item,
|
||||
source_block_id,
|
||||
})
|
||||
}
|
||||
|
||||
fn map_schedule(row: ScheduleRow, slot_rows: Vec<SlotRow>) -> Result<GeneratedSchedule, DomainError> {
|
||||
let id = Uuid::parse_str(&row.id)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid schedule UUID: {}", e)))?;
|
||||
let channel_id = Uuid::parse_str(&row.channel_id)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid channel UUID: {}", e)))?;
|
||||
|
||||
let slots: Result<Vec<ScheduledSlot>, _> = slot_rows.into_iter().map(map_slot_row).collect();
|
||||
|
||||
Ok(GeneratedSchedule {
|
||||
id,
|
||||
channel_id,
|
||||
valid_from: parse_dt(&row.valid_from)?,
|
||||
valid_until: parse_dt(&row.valid_until)?,
|
||||
generation: row.generation as u32,
|
||||
slots: slots?,
|
||||
})
|
||||
}
|
||||
|
||||
impl TryFrom<PlaybackRecordRow> for PlaybackRecord {
|
||||
type Error = DomainError;
|
||||
|
||||
fn try_from(row: PlaybackRecordRow) -> Result<Self, Self::Error> {
|
||||
let id = Uuid::parse_str(&row.id)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid UUID: {}", e)))?;
|
||||
let channel_id = Uuid::parse_str(&row.channel_id)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid channel UUID: {}", e)))?;
|
||||
|
||||
Ok(PlaybackRecord {
|
||||
id,
|
||||
channel_id,
|
||||
item_id: MediaItemId::new(row.item_id),
|
||||
played_at: parse_dt(&row.played_at)?,
|
||||
generation: row.generation as u32,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// SQLite adapter
|
||||
// ============================================================================
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
pub struct SqliteScheduleRepository {
|
||||
pool: sqlx::SqlitePool,
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
impl SqliteScheduleRepository {
|
||||
pub fn new(pool: sqlx::SqlitePool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
|
||||
async fn fetch_slots(&self, schedule_id: &str) -> DomainResult<Vec<SlotRow>> {
|
||||
sqlx::query_as(
|
||||
"SELECT id, schedule_id, start_at, end_at, item, source_block_id \
|
||||
FROM scheduled_slots WHERE schedule_id = ? ORDER BY start_at",
|
||||
)
|
||||
.bind(schedule_id)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
#[async_trait]
|
||||
impl ScheduleRepository for SqliteScheduleRepository {
|
||||
async fn find_active(
|
||||
&self,
|
||||
channel_id: ChannelId,
|
||||
at: DateTime<Utc>,
|
||||
) -> DomainResult<Option<GeneratedSchedule>> {
|
||||
let at_str = at.to_rfc3339();
|
||||
let row: Option<ScheduleRow> = sqlx::query_as(
|
||||
"SELECT id, channel_id, valid_from, valid_until, generation \
|
||||
FROM generated_schedules \
|
||||
WHERE channel_id = ? AND valid_from <= ? AND valid_until > ? \
|
||||
LIMIT 1",
|
||||
)
|
||||
.bind(channel_id.to_string())
|
||||
.bind(&at_str)
|
||||
.bind(&at_str)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
match row {
|
||||
None => Ok(None),
|
||||
Some(r) => {
|
||||
let slots = self.fetch_slots(&r.id).await?;
|
||||
Some(map_schedule(r, slots)).transpose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn find_latest(&self, channel_id: ChannelId) -> DomainResult<Option<GeneratedSchedule>> {
|
||||
let row: Option<ScheduleRow> = sqlx::query_as(
|
||||
"SELECT id, channel_id, valid_from, valid_until, generation \
|
||||
FROM generated_schedules \
|
||||
WHERE channel_id = ? ORDER BY valid_from DESC LIMIT 1",
|
||||
)
|
||||
.bind(channel_id.to_string())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
match row {
|
||||
None => Ok(None),
|
||||
Some(r) => {
|
||||
let slots = self.fetch_slots(&r.id).await?;
|
||||
Some(map_schedule(r, slots)).transpose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> {
|
||||
// Upsert the schedule header
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO generated_schedules (id, channel_id, valid_from, valid_until, generation)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
valid_from = excluded.valid_from,
|
||||
valid_until = excluded.valid_until,
|
||||
generation = excluded.generation
|
||||
"#,
|
||||
)
|
||||
.bind(schedule.id.to_string())
|
||||
.bind(schedule.channel_id.to_string())
|
||||
.bind(schedule.valid_from.to_rfc3339())
|
||||
.bind(schedule.valid_until.to_rfc3339())
|
||||
.bind(schedule.generation as i64)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
// Replace all slots (delete-then-insert is safe here; schedule saves are
|
||||
// infrequent and atomic within a single-writer SQLite connection)
|
||||
sqlx::query("DELETE FROM scheduled_slots WHERE schedule_id = ?")
|
||||
.bind(schedule.id.to_string())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
for slot in &schedule.slots {
|
||||
let item_json = serde_json::to_string(&slot.item).map_err(|e| {
|
||||
DomainError::RepositoryError(format!("Failed to serialize slot item: {}", e))
|
||||
})?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO scheduled_slots (id, schedule_id, start_at, end_at, item, source_block_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(slot.id.to_string())
|
||||
.bind(schedule.id.to_string())
|
||||
.bind(slot.start_at.to_rfc3339())
|
||||
.bind(slot.end_at.to_rfc3339())
|
||||
.bind(&item_json)
|
||||
.bind(slot.source_block_id.to_string())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn find_playback_history(
|
||||
&self,
|
||||
channel_id: ChannelId,
|
||||
) -> DomainResult<Vec<PlaybackRecord>> {
|
||||
let rows: Vec<PlaybackRecordRow> = sqlx::query_as(
|
||||
"SELECT id, channel_id, item_id, played_at, generation \
|
||||
FROM playback_records WHERE channel_id = ? ORDER BY played_at DESC",
|
||||
)
|
||||
.bind(channel_id.to_string())
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
rows.into_iter().map(PlaybackRecord::try_from).collect()
|
||||
}
|
||||
|
||||
async fn save_playback_record(&self, record: &PlaybackRecord) -> DomainResult<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO playback_records (id, channel_id, item_id, played_at, generation)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(id) DO NOTHING
|
||||
"#,
|
||||
)
|
||||
.bind(record.id.to_string())
|
||||
.bind(record.channel_id.to_string())
|
||||
.bind(record.item_id.as_ref())
|
||||
.bind(record.played_at.to_rfc3339())
|
||||
.bind(record.generation as i64)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// PostgreSQL adapter
|
||||
// ============================================================================
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
pub struct PostgresScheduleRepository {
|
||||
pool: sqlx::Pool<sqlx::Postgres>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
impl PostgresScheduleRepository {
|
||||
pub fn new(pool: sqlx::Pool<sqlx::Postgres>) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
|
||||
async fn fetch_slots(&self, schedule_id: &str) -> DomainResult<Vec<SlotRow>> {
|
||||
sqlx::query_as(
|
||||
"SELECT id, schedule_id, start_at, end_at, item, source_block_id \
|
||||
FROM scheduled_slots WHERE schedule_id = $1 ORDER BY start_at",
|
||||
)
|
||||
.bind(schedule_id)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
#[async_trait]
|
||||
impl ScheduleRepository for PostgresScheduleRepository {
|
||||
async fn find_active(
|
||||
&self,
|
||||
channel_id: ChannelId,
|
||||
at: DateTime<Utc>,
|
||||
) -> DomainResult<Option<GeneratedSchedule>> {
|
||||
let at_str = at.to_rfc3339();
|
||||
let row: Option<ScheduleRow> = sqlx::query_as(
|
||||
"SELECT id, channel_id, valid_from, valid_until, generation \
|
||||
FROM generated_schedules \
|
||||
WHERE channel_id = $1 AND valid_from <= $2 AND valid_until > $3 \
|
||||
LIMIT 1",
|
||||
)
|
||||
.bind(channel_id.to_string())
|
||||
.bind(&at_str)
|
||||
.bind(&at_str)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
match row {
|
||||
None => Ok(None),
|
||||
Some(r) => {
|
||||
let slots = self.fetch_slots(&r.id).await?;
|
||||
Some(map_schedule(r, slots)).transpose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn find_latest(&self, channel_id: ChannelId) -> DomainResult<Option<GeneratedSchedule>> {
|
||||
let row: Option<ScheduleRow> = sqlx::query_as(
|
||||
"SELECT id, channel_id, valid_from, valid_until, generation \
|
||||
FROM generated_schedules \
|
||||
WHERE channel_id = $1 ORDER BY valid_from DESC LIMIT 1",
|
||||
)
|
||||
.bind(channel_id.to_string())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
match row {
|
||||
None => Ok(None),
|
||||
Some(r) => {
|
||||
let slots = self.fetch_slots(&r.id).await?;
|
||||
Some(map_schedule(r, slots)).transpose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO generated_schedules (id, channel_id, valid_from, valid_until, generation)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
valid_from = EXCLUDED.valid_from,
|
||||
valid_until = EXCLUDED.valid_until,
|
||||
generation = EXCLUDED.generation
|
||||
"#,
|
||||
)
|
||||
.bind(schedule.id.to_string())
|
||||
.bind(schedule.channel_id.to_string())
|
||||
.bind(schedule.valid_from.to_rfc3339())
|
||||
.bind(schedule.valid_until.to_rfc3339())
|
||||
.bind(schedule.generation as i64)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
sqlx::query("DELETE FROM scheduled_slots WHERE schedule_id = $1")
|
||||
.bind(schedule.id.to_string())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
for slot in &schedule.slots {
|
||||
let item_json = serde_json::to_string(&slot.item).map_err(|e| {
|
||||
DomainError::RepositoryError(format!("Failed to serialize slot item: {}", e))
|
||||
})?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO scheduled_slots (id, schedule_id, start_at, end_at, item, source_block_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
"#,
|
||||
)
|
||||
.bind(slot.id.to_string())
|
||||
.bind(schedule.id.to_string())
|
||||
.bind(slot.start_at.to_rfc3339())
|
||||
.bind(slot.end_at.to_rfc3339())
|
||||
.bind(&item_json)
|
||||
.bind(slot.source_block_id.to_string())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn find_playback_history(
|
||||
&self,
|
||||
channel_id: ChannelId,
|
||||
) -> DomainResult<Vec<PlaybackRecord>> {
|
||||
let rows: Vec<PlaybackRecordRow> = sqlx::query_as(
|
||||
"SELECT id, channel_id, item_id, played_at, generation \
|
||||
FROM playback_records WHERE channel_id = $1 ORDER BY played_at DESC",
|
||||
)
|
||||
.bind(channel_id.to_string())
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
rows.into_iter().map(PlaybackRecord::try_from).collect()
|
||||
}
|
||||
|
||||
async fn save_playback_record(&self, record: &PlaybackRecord) -> DomainResult<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO playback_records (id, channel_id, item_id, played_at, generation)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT(id) DO NOTHING
|
||||
"#,
|
||||
)
|
||||
.bind(record.id.to_string())
|
||||
.bind(record.channel_id.to_string())
|
||||
.bind(record.item_id.as_ref())
|
||||
.bind(record.played_at.to_rfc3339())
|
||||
.bind(record.generation as i64)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
324
k-tv-backend/infra/src/user_repository.rs
Normal file
324
k-tv-backend/infra/src/user_repository.rs
Normal file
@@ -0,0 +1,324 @@
|
||||
//! SQLite and PostgreSQL implementations of UserRepository
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use sqlx::FromRow;
|
||||
use uuid::Uuid;
|
||||
|
||||
use domain::{DomainError, DomainResult, Email, User, UserRepository};
|
||||
|
||||
/// Row type for database query results (shared between SQLite and PostgreSQL)
|
||||
#[derive(Debug, FromRow)]
|
||||
struct UserRow {
|
||||
id: String,
|
||||
subject: String,
|
||||
email: String,
|
||||
password_hash: Option<String>,
|
||||
created_at: String,
|
||||
}
|
||||
|
||||
impl TryFrom<UserRow> for User {
|
||||
type Error = DomainError;
|
||||
|
||||
fn try_from(row: UserRow) -> Result<Self, Self::Error> {
|
||||
let id = Uuid::parse_str(&row.id)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid UUID: {}", e)))?;
|
||||
let created_at = DateTime::parse_from_rfc3339(&row.created_at)
|
||||
.map(|dt| dt.with_timezone(&Utc))
|
||||
.or_else(|_| {
|
||||
// Fallback for SQLite datetime format
|
||||
chrono::NaiveDateTime::parse_from_str(&row.created_at, "%Y-%m-%d %H:%M:%S")
|
||||
.map(|dt| dt.and_utc())
|
||||
})
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid datetime: {}", e)))?;
|
||||
|
||||
let email = Email::try_from(row.email)
|
||||
.map_err(|e| DomainError::RepositoryError(format!("Invalid email in DB: {}", e)))?;
|
||||
|
||||
Ok(User::with_id(
|
||||
id,
|
||||
row.subject,
|
||||
email,
|
||||
row.password_hash,
|
||||
created_at,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// SQLite adapter for UserRepository
|
||||
#[cfg(feature = "sqlite")]
|
||||
#[derive(Clone)]
|
||||
pub struct SqliteUserRepository {
|
||||
pool: sqlx::SqlitePool,
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
impl SqliteUserRepository {
|
||||
pub fn new(pool: sqlx::SqlitePool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
#[async_trait]
|
||||
impl UserRepository for SqliteUserRepository {
|
||||
async fn find_by_id(&self, id: Uuid) -> DomainResult<Option<User>> {
|
||||
let id_str = id.to_string();
|
||||
let row: Option<UserRow> = sqlx::query_as(
|
||||
"SELECT id, subject, email, password_hash, created_at FROM users WHERE id = ?",
|
||||
)
|
||||
.bind(&id_str)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
row.map(User::try_from).transpose()
|
||||
}
|
||||
|
||||
async fn find_by_subject(&self, subject: &str) -> DomainResult<Option<User>> {
|
||||
let row: Option<UserRow> = sqlx::query_as(
|
||||
"SELECT id, subject, email, password_hash, created_at FROM users WHERE subject = ?",
|
||||
)
|
||||
.bind(subject)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
row.map(User::try_from).transpose()
|
||||
}
|
||||
|
||||
async fn find_by_email(&self, email: &str) -> DomainResult<Option<User>> {
|
||||
let row: Option<UserRow> = sqlx::query_as(
|
||||
"SELECT id, subject, email, password_hash, created_at FROM users WHERE email = ?",
|
||||
)
|
||||
.bind(email)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
row.map(User::try_from).transpose()
|
||||
}
|
||||
|
||||
async fn save(&self, user: &User) -> DomainResult<()> {
|
||||
let id = user.id.to_string();
|
||||
let created_at = user.created_at.to_rfc3339();
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO users (id, subject, email, password_hash, created_at)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
subject = excluded.subject,
|
||||
email = excluded.email,
|
||||
password_hash = excluded.password_hash
|
||||
"#,
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(&user.subject)
|
||||
.bind(user.email.as_ref())
|
||||
.bind(&user.password_hash)
|
||||
.bind(&created_at)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
// Surface UNIQUE constraint violations as domain-level conflicts
|
||||
let msg = e.to_string();
|
||||
if msg.contains("UNIQUE constraint failed") || msg.contains("unique constraint") {
|
||||
DomainError::UserAlreadyExists(user.email.as_ref().to_string())
|
||||
} else {
|
||||
DomainError::RepositoryError(msg)
|
||||
}
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, id: Uuid) -> DomainResult<()> {
|
||||
let id_str = id.to_string();
|
||||
sqlx::query("DELETE FROM users WHERE id = ?")
|
||||
.bind(&id_str)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, feature = "sqlite"))]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::db::run_migrations;
|
||||
use k_core::db::{DatabaseConfig, DatabasePool, connect};
|
||||
|
||||
async fn setup_test_db() -> sqlx::SqlitePool {
|
||||
let config = DatabaseConfig::default();
|
||||
let db_pool = connect(&config).await.expect("Failed to create pool");
|
||||
|
||||
run_migrations(&db_pool).await.unwrap();
|
||||
|
||||
match db_pool {
|
||||
DatabasePool::Sqlite(pool) => pool,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_save_and_find_user() {
|
||||
let pool = setup_test_db().await;
|
||||
let repo = SqliteUserRepository::new(pool);
|
||||
|
||||
let email = Email::try_from("test@example.com").unwrap();
|
||||
let user = User::new("oidc|123", email);
|
||||
repo.save(&user).await.unwrap();
|
||||
|
||||
let found = repo.find_by_id(user.id).await.unwrap();
|
||||
assert!(found.is_some());
|
||||
let found = found.unwrap();
|
||||
assert_eq!(found.subject, "oidc|123");
|
||||
assert_eq!(found.email.as_ref(), "test@example.com");
|
||||
assert!(found.password_hash.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_save_and_find_user_with_password() {
|
||||
let pool = setup_test_db().await;
|
||||
let repo = SqliteUserRepository::new(pool);
|
||||
|
||||
let email = Email::try_from("local@example.com").unwrap();
|
||||
let user = User::new_local(email, "hashed_pw");
|
||||
repo.save(&user).await.unwrap();
|
||||
|
||||
let found = repo.find_by_id(user.id).await.unwrap();
|
||||
assert!(found.is_some());
|
||||
let found = found.unwrap();
|
||||
assert_eq!(found.email.as_ref(), "local@example.com");
|
||||
assert_eq!(found.password_hash, Some("hashed_pw".to_string()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_find_by_subject() {
|
||||
let pool = setup_test_db().await;
|
||||
let repo = SqliteUserRepository::new(pool);
|
||||
|
||||
let email = Email::try_from("user@gmail.com").unwrap();
|
||||
let user = User::new("google|456", email);
|
||||
repo.save(&user).await.unwrap();
|
||||
|
||||
let found = repo.find_by_subject("google|456").await.unwrap();
|
||||
assert!(found.is_some());
|
||||
assert_eq!(found.unwrap().id, user.id);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_user() {
|
||||
let pool = setup_test_db().await;
|
||||
let repo = SqliteUserRepository::new(pool);
|
||||
|
||||
let email = Email::try_from("delete@test.com").unwrap();
|
||||
let user = User::new("test|789", email);
|
||||
repo.save(&user).await.unwrap();
|
||||
repo.delete(user.id).await.unwrap();
|
||||
|
||||
let found = repo.find_by_id(user.id).await.unwrap();
|
||||
assert!(found.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
/// PostgreSQL adapter for UserRepository
|
||||
#[cfg(feature = "postgres")]
|
||||
#[derive(Clone)]
|
||||
pub struct PostgresUserRepository {
|
||||
pool: sqlx::Pool<sqlx::Postgres>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
impl PostgresUserRepository {
|
||||
pub fn new(pool: sqlx::Pool<sqlx::Postgres>) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
#[async_trait]
|
||||
impl UserRepository for PostgresUserRepository {
|
||||
async fn find_by_id(&self, id: Uuid) -> DomainResult<Option<User>> {
|
||||
let id_str = id.to_string();
|
||||
let row: Option<UserRow> = sqlx::query_as(
|
||||
"SELECT id, subject, email, password_hash, created_at FROM users WHERE id = $1",
|
||||
)
|
||||
.bind(&id_str)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
row.map(User::try_from).transpose()
|
||||
}
|
||||
|
||||
async fn find_by_subject(&self, subject: &str) -> DomainResult<Option<User>> {
|
||||
let row: Option<UserRow> = sqlx::query_as(
|
||||
"SELECT id, subject, email, password_hash, created_at FROM users WHERE subject = $1",
|
||||
)
|
||||
.bind(subject)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
row.map(User::try_from).transpose()
|
||||
}
|
||||
|
||||
async fn find_by_email(&self, email: &str) -> DomainResult<Option<User>> {
|
||||
let row: Option<UserRow> = sqlx::query_as(
|
||||
"SELECT id, subject, email, password_hash, created_at FROM users WHERE email = $1",
|
||||
)
|
||||
.bind(email)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
row.map(User::try_from).transpose()
|
||||
}
|
||||
|
||||
async fn save(&self, user: &User) -> DomainResult<()> {
|
||||
let id = user.id.to_string();
|
||||
let created_at = user.created_at.to_rfc3339();
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO users (id, subject, email, password_hash, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
subject = excluded.subject,
|
||||
email = excluded.email,
|
||||
password_hash = excluded.password_hash
|
||||
"#,
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(&user.subject)
|
||||
.bind(user.email.as_ref())
|
||||
.bind(&user.password_hash)
|
||||
.bind(&created_at)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
let msg = e.to_string();
|
||||
if msg.contains("unique constraint") || msg.contains("duplicate key") {
|
||||
DomainError::UserAlreadyExists(user.email.as_ref().to_string())
|
||||
} else {
|
||||
DomainError::RepositoryError(msg)
|
||||
}
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, id: Uuid) -> DomainResult<()> {
|
||||
let id_str = id.to_string();
|
||||
sqlx::query("DELETE FROM users WHERE id = $1")
|
||||
.bind(&id_str)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user