diff --git a/k-tv-backend/api/src/routes/auth.rs b/k-tv-backend/api/src/routes/auth.rs deleted file mode 100644 index 4feb871..0000000 --- a/k-tv-backend/api/src/routes/auth.rs +++ /dev/null @@ -1,257 +0,0 @@ -//! Authentication routes -//! -//! Provides login, register, logout, token, and OIDC endpoints. -//! All authentication is JWT-based. OIDC state is stored in an encrypted cookie. - -use axum::{ - Router, - extract::{Json, State}, - http::StatusCode, - response::IntoResponse, - routing::{get, post}, -}; - -use crate::{ - dto::{LoginRequest, RegisterRequest, TokenResponse, UserResponse}, - error::ApiError, - extractors::CurrentUser, - state::AppState, -}; - -pub fn router() -> Router { - let r = Router::new() - .route("/login", post(login)) - .route("/register", post(register)) - .route("/logout", post(logout)) - .route("/me", get(me)); - - #[cfg(feature = "auth-jwt")] - let r = r.route("/token", post(get_token)); - - #[cfg(feature = "auth-oidc")] - let r = r - .route("/login/oidc", get(oidc_login)) - .route("/callback", get(oidc_callback)); - - r -} - -/// Login with email + password → JWT token -async fn login( - State(state): State, - Json(payload): Json, -) -> Result { - let user = state - .user_service - .find_by_email(payload.email.as_ref()) - .await? - .ok_or_else(|| ApiError::Unauthorized("Invalid credentials".to_string()))?; - - let hash = user - .password_hash - .as_deref() - .ok_or_else(|| ApiError::Unauthorized("Invalid credentials".to_string()))?; - - if !infra::auth::verify_password(payload.password.as_ref(), hash) { - return Err(ApiError::Unauthorized("Invalid credentials".to_string())); - } - - let token = create_jwt(&user, &state)?; - - Ok(( - StatusCode::OK, - Json(TokenResponse { - access_token: token, - token_type: "Bearer".to_string(), - expires_in: state.config.jwt_expiry_hours * 3600, - }), - )) -} - -/// Register a new local user → JWT token -async fn register( - State(state): State, - Json(payload): Json, -) -> Result { - if !state.config.allow_registration { - return Err(ApiError::Forbidden("Registration is disabled".to_string())); - } - - let password_hash = infra::auth::hash_password(payload.password.as_ref()); - - let user = state - .user_service - .create_local(payload.email.as_ref(), &password_hash) - .await?; - - let token = create_jwt(&user, &state)?; - - Ok(( - StatusCode::CREATED, - Json(TokenResponse { - access_token: token, - token_type: "Bearer".to_string(), - expires_in: state.config.jwt_expiry_hours * 3600, - }), - )) -} - -/// Logout — JWT is stateless; instruct the client to drop the token -async fn logout() -> impl IntoResponse { - StatusCode::OK -} - -/// Get current user info from JWT -async fn me(CurrentUser(user): CurrentUser) -> Result { - Ok(Json(UserResponse { - id: user.id, - email: user.email.into_inner(), - created_at: user.created_at, - })) -} - -/// Issue a new JWT for the currently authenticated user (OIDC→JWT exchange or token refresh) -#[cfg(feature = "auth-jwt")] -async fn get_token( - State(state): State, - CurrentUser(user): CurrentUser, -) -> Result { - let token = create_jwt(&user, &state)?; - - Ok(Json(TokenResponse { - access_token: token, - token_type: "Bearer".to_string(), - expires_in: state.config.jwt_expiry_hours * 3600, - })) -} - -/// Helper: create JWT for a user -#[cfg(feature = "auth-jwt")] -fn create_jwt(user: &domain::User, state: &AppState) -> Result { - let validator = state - .jwt_validator - .as_ref() - .ok_or_else(|| ApiError::Internal("JWT not configured".to_string()))?; - - validator - .create_token(user) - .map_err(|e| ApiError::Internal(format!("Failed to create token: {}", e))) -} - -#[cfg(not(feature = "auth-jwt"))] -fn create_jwt(_user: &domain::User, _state: &AppState) -> Result { - Err(ApiError::Internal("JWT feature not enabled".to_string())) -} - -// ============================================================================ -// OIDC Routes -// ============================================================================ - -#[cfg(feature = "auth-oidc")] -#[derive(serde::Deserialize)] -struct CallbackParams { - code: String, - state: String, -} - -/// Start OIDC login: generate authorization URL and store state in encrypted cookie -#[cfg(feature = "auth-oidc")] -async fn oidc_login( - State(state): State, - jar: axum_extra::extract::PrivateCookieJar, -) -> Result { - use axum::http::header; - use axum::response::Response; - use axum_extra::extract::cookie::{Cookie, SameSite}; - - let service = state - .oidc_service - .as_ref() - .ok_or(ApiError::Internal("OIDC not configured".into()))?; - - let (auth_data, oidc_state) = service.get_authorization_url(); - - let state_json = serde_json::to_string(&oidc_state) - .map_err(|e| ApiError::Internal(format!("Failed to serialize OIDC state: {}", e)))?; - - let cookie = Cookie::build(("oidc_state", state_json)) - .max_age(time::Duration::minutes(5)) - .http_only(true) - .same_site(SameSite::Lax) - .secure(state.config.secure_cookie) - .path("/") - .build(); - - let updated_jar = jar.add(cookie); - - let redirect = axum::response::Redirect::to(auth_data.url.as_str()).into_response(); - let (mut parts, body) = redirect.into_parts(); - parts.headers.insert( - header::CACHE_CONTROL, - "no-cache, no-store, must-revalidate".parse().unwrap(), - ); - parts - .headers - .insert(header::PRAGMA, "no-cache".parse().unwrap()); - parts.headers.insert(header::EXPIRES, "0".parse().unwrap()); - - Ok((updated_jar, Response::from_parts(parts, body))) -} - -/// Handle OIDC callback: verify state cookie, complete exchange, issue JWT, clear cookie -#[cfg(feature = "auth-oidc")] -async fn oidc_callback( - State(state): State, - jar: axum_extra::extract::PrivateCookieJar, - axum::extract::Query(params): axum::extract::Query, -) -> Result { - use infra::auth::oidc::OidcState; - - let service = state - .oidc_service - .as_ref() - .ok_or(ApiError::Internal("OIDC not configured".into()))?; - - // Read and decrypt OIDC state from cookie - let cookie = jar - .get("oidc_state") - .ok_or(ApiError::Validation("Missing OIDC state cookie".into()))?; - - let oidc_state: OidcState = serde_json::from_str(cookie.value()) - .map_err(|_| ApiError::Validation("Invalid OIDC state cookie".into()))?; - - // Verify CSRF token - if params.state != oidc_state.csrf_token.as_ref() { - return Err(ApiError::Validation("Invalid CSRF token".into())); - } - - // Complete OIDC exchange - let oidc_user = service - .resolve_callback( - domain::AuthorizationCode::new(params.code), - oidc_state.nonce, - oidc_state.pkce_verifier, - ) - .await - .map_err(|e| ApiError::Internal(e.to_string()))?; - - let user = state - .user_service - .find_or_create(&oidc_user.subject, &oidc_user.email) - .await - .map_err(|e| ApiError::Internal(e.to_string()))?; - - // Clear the OIDC state cookie - let cleared_jar = jar.remove(axum_extra::extract::cookie::Cookie::from("oidc_state")); - - let token = create_jwt(&user, &state)?; - - Ok(( - cleared_jar, - Json(TokenResponse { - access_token: token, - token_type: "Bearer".to_string(), - expires_in: state.config.jwt_expiry_hours * 3600, - }), - )) -} diff --git a/k-tv-backend/api/src/routes/auth/local.rs b/k-tv-backend/api/src/routes/auth/local.rs new file mode 100644 index 0000000..03e601f --- /dev/null +++ b/k-tv-backend/api/src/routes/auth/local.rs @@ -0,0 +1,104 @@ +use axum::{ + Json, + extract::State, + http::StatusCode, + response::IntoResponse, +}; + +use crate::{ + dto::{LoginRequest, RegisterRequest, TokenResponse, UserResponse}, + error::ApiError, + extractors::CurrentUser, + state::AppState, +}; + +use super::create_jwt; + +/// Login with email + password → JWT token +pub(super) async fn login( + State(state): State, + Json(payload): Json, +) -> Result { + let user = state + .user_service + .find_by_email(payload.email.as_ref()) + .await? + .ok_or_else(|| ApiError::Unauthorized("Invalid credentials".to_string()))?; + + let hash = user + .password_hash + .as_deref() + .ok_or_else(|| ApiError::Unauthorized("Invalid credentials".to_string()))?; + + if !infra::auth::verify_password(payload.password.as_ref(), hash) { + return Err(ApiError::Unauthorized("Invalid credentials".to_string())); + } + + let token = create_jwt(&user, &state)?; + + Ok(( + StatusCode::OK, + Json(TokenResponse { + access_token: token, + token_type: "Bearer".to_string(), + expires_in: state.config.jwt_expiry_hours * 3600, + }), + )) +} + +/// Register a new local user → JWT token +pub(super) async fn register( + State(state): State, + Json(payload): Json, +) -> Result { + if !state.config.allow_registration { + return Err(ApiError::Forbidden("Registration is disabled".to_string())); + } + + let password_hash = infra::auth::hash_password(payload.password.as_ref()); + + let user = state + .user_service + .create_local(payload.email.as_ref(), &password_hash) + .await?; + + let token = create_jwt(&user, &state)?; + + Ok(( + StatusCode::CREATED, + Json(TokenResponse { + access_token: token, + token_type: "Bearer".to_string(), + expires_in: state.config.jwt_expiry_hours * 3600, + }), + )) +} + +/// Logout — JWT is stateless; instruct the client to drop the token +pub(super) async fn logout() -> impl IntoResponse { + StatusCode::OK +} + +/// Get current user info from JWT +pub(super) async fn me(CurrentUser(user): CurrentUser) -> Result { + Ok(Json(UserResponse { + id: user.id, + email: user.email.into_inner(), + created_at: user.created_at, + })) +} + +/// Issue a new JWT for the currently authenticated user (OIDC→JWT exchange or token refresh) +#[cfg(feature = "auth-jwt")] +pub(super) async fn get_token( + State(state): State, + CurrentUser(user): CurrentUser, +) -> Result { + let token = create_jwt(&user, &state)?; + + Ok(Json(TokenResponse { + access_token: token, + token_type: "Bearer".to_string(), + expires_in: state.config.jwt_expiry_hours * 3600, + })) +} diff --git a/k-tv-backend/api/src/routes/auth/mod.rs b/k-tv-backend/api/src/routes/auth/mod.rs new file mode 100644 index 0000000..b9384f3 --- /dev/null +++ b/k-tv-backend/api/src/routes/auth/mod.rs @@ -0,0 +1,47 @@ +//! Authentication routes +//! +//! Provides login, register, logout, token, and OIDC endpoints. +//! All authentication is JWT-based. OIDC state is stored in an encrypted cookie. + +use axum::{Router, routing::{get, post}}; + +use crate::{error::ApiError, state::AppState}; + +mod local; +mod oidc; + +pub fn router() -> Router { + let r = Router::new() + .route("/login", post(local::login)) + .route("/register", post(local::register)) + .route("/logout", post(local::logout)) + .route("/me", get(local::me)); + + #[cfg(feature = "auth-jwt")] + let r = r.route("/token", post(local::get_token)); + + #[cfg(feature = "auth-oidc")] + let r = r + .route("/login/oidc", get(oidc::oidc_login)) + .route("/callback", get(oidc::oidc_callback)); + + r +} + +/// Helper: create JWT for a user +#[cfg(feature = "auth-jwt")] +pub(super) fn create_jwt(user: &domain::User, state: &AppState) -> Result { + let validator = state + .jwt_validator + .as_ref() + .ok_or_else(|| ApiError::Internal("JWT not configured".to_string()))?; + + validator + .create_token(user) + .map_err(|e| ApiError::Internal(format!("Failed to create token: {}", e))) +} + +#[cfg(not(feature = "auth-jwt"))] +pub(super) fn create_jwt(_user: &domain::User, _state: &AppState) -> Result { + Err(ApiError::Internal("JWT feature not enabled".to_string())) +} diff --git a/k-tv-backend/api/src/routes/auth/oidc.rs b/k-tv-backend/api/src/routes/auth/oidc.rs new file mode 100644 index 0000000..d06e20b --- /dev/null +++ b/k-tv-backend/api/src/routes/auth/oidc.rs @@ -0,0 +1,124 @@ +#[cfg(feature = "auth-oidc")] +use axum::{ + Json, + extract::State, + http::header, + response::{IntoResponse, Response}, +}; + +#[cfg(feature = "auth-oidc")] +use crate::{ + dto::TokenResponse, + error::ApiError, + state::AppState, +}; + +#[cfg(feature = "auth-oidc")] +use super::create_jwt; + +#[cfg(feature = "auth-oidc")] +#[derive(serde::Deserialize)] +pub(super) struct CallbackParams { + pub code: String, + pub state: String, +} + +/// Start OIDC login: generate authorization URL and store state in encrypted cookie +#[cfg(feature = "auth-oidc")] +pub(super) async fn oidc_login( + State(state): State, + jar: axum_extra::extract::PrivateCookieJar, +) -> Result { + use axum_extra::extract::cookie::{Cookie, SameSite}; + + let service = state + .oidc_service + .as_ref() + .ok_or(ApiError::Internal("OIDC not configured".into()))?; + + let (auth_data, oidc_state) = service.get_authorization_url(); + + let state_json = serde_json::to_string(&oidc_state) + .map_err(|e| ApiError::Internal(format!("Failed to serialize OIDC state: {}", e)))?; + + let cookie = Cookie::build(("oidc_state", state_json)) + .max_age(time::Duration::minutes(5)) + .http_only(true) + .same_site(SameSite::Lax) + .secure(state.config.secure_cookie) + .path("/") + .build(); + + let updated_jar = jar.add(cookie); + + let redirect = axum::response::Redirect::to(auth_data.url.as_str()).into_response(); + let (mut parts, body) = redirect.into_parts(); + parts.headers.insert( + header::CACHE_CONTROL, + "no-cache, no-store, must-revalidate".parse().unwrap(), + ); + parts + .headers + .insert(header::PRAGMA, "no-cache".parse().unwrap()); + parts.headers.insert(header::EXPIRES, "0".parse().unwrap()); + + Ok((updated_jar, Response::from_parts(parts, body))) +} + +/// Handle OIDC callback: verify state cookie, complete exchange, issue JWT, clear cookie +#[cfg(feature = "auth-oidc")] +pub(super) async fn oidc_callback( + State(state): State, + jar: axum_extra::extract::PrivateCookieJar, + axum::extract::Query(params): axum::extract::Query, +) -> Result { + use infra::auth::oidc::OidcState; + + let service = state + .oidc_service + .as_ref() + .ok_or(ApiError::Internal("OIDC not configured".into()))?; + + // Read and decrypt OIDC state from cookie + let cookie = jar + .get("oidc_state") + .ok_or(ApiError::Validation("Missing OIDC state cookie".into()))?; + + let oidc_state: OidcState = serde_json::from_str(cookie.value()) + .map_err(|_| ApiError::Validation("Invalid OIDC state cookie".into()))?; + + // Verify CSRF token + if params.state != oidc_state.csrf_token.as_ref() { + return Err(ApiError::Validation("Invalid CSRF token".into())); + } + + // Complete OIDC exchange + let oidc_user = service + .resolve_callback( + domain::AuthorizationCode::new(params.code), + oidc_state.nonce, + oidc_state.pkce_verifier, + ) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + let user = state + .user_service + .find_or_create(&oidc_user.subject, &oidc_user.email) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + // Clear the OIDC state cookie + let cleared_jar = jar.remove(axum_extra::extract::cookie::Cookie::from("oidc_state")); + + let token = create_jwt(&user, &state)?; + + Ok(( + cleared_jar, + Json(TokenResponse { + access_token: token, + token_type: "Bearer".to_string(), + expires_in: state.config.jwt_expiry_hours * 3600, + }), + )) +} diff --git a/k-tv-backend/api/src/routes/channels.rs b/k-tv-backend/api/src/routes/channels.rs deleted file mode 100644 index fde3a5d..0000000 --- a/k-tv-backend/api/src/routes/channels.rs +++ /dev/null @@ -1,285 +0,0 @@ -//! Channel routes -//! -//! CRUD + schedule generation require authentication (Bearer JWT). -//! Viewing endpoints (list, now, epg, stream) are intentionally public so the -//! TV page works without login. - -use axum::{ - Json, Router, - extract::{Path, Query, State}, - http::StatusCode, - response::{IntoResponse, Redirect, Response}, - routing::{get, post}, -}; -use chrono::{DateTime, Utc}; -use serde::Deserialize; -use uuid::Uuid; - -use domain::{DomainError, ScheduleEngineService}; - -use crate::{ - dto::{ - ChannelResponse, CreateChannelRequest, CurrentBroadcastResponse, ScheduleResponse, - ScheduledSlotResponse, UpdateChannelRequest, - }, - error::ApiError, - extractors::CurrentUser, - state::AppState, -}; - -pub fn router() -> Router { - Router::new() - .route("/", get(list_channels).post(create_channel)) - .route( - "/{id}", - get(get_channel).put(update_channel).delete(delete_channel), - ) - .route( - "/{id}/schedule", - post(generate_schedule).get(get_active_schedule), - ) - .route("/{id}/now", get(get_current_broadcast)) - .route("/{id}/epg", get(get_epg)) - .route("/{id}/stream", get(get_stream)) -} - -// ============================================================================ -// Channel CRUD -// ============================================================================ - -async fn list_channels( - State(state): State, -) -> Result { - let channels = state.channel_service.find_all().await?; - let response: Vec = channels.into_iter().map(Into::into).collect(); - Ok(Json(response)) -} - -async fn create_channel( - State(state): State, - CurrentUser(user): CurrentUser, - Json(payload): Json, -) -> Result { - let mut channel = state - .channel_service - .create(user.id, &payload.name, &payload.timezone) - .await?; - - if let Some(desc) = payload.description { - channel.description = Some(desc); - channel = state.channel_service.update(channel).await?; - } - - Ok((StatusCode::CREATED, Json(ChannelResponse::from(channel)))) -} - -async fn get_channel( - State(state): State, - CurrentUser(user): CurrentUser, - Path(channel_id): Path, -) -> Result { - let channel = state.channel_service.find_by_id(channel_id).await?; - require_owner(&channel, user.id)?; - Ok(Json(ChannelResponse::from(channel))) -} - -async fn update_channel( - State(state): State, - CurrentUser(user): CurrentUser, - Path(channel_id): Path, - Json(payload): Json, -) -> Result { - let mut channel = state.channel_service.find_by_id(channel_id).await?; - require_owner(&channel, user.id)?; - - if let Some(name) = payload.name { - channel.name = name; - } - if let Some(desc) = payload.description { - channel.description = Some(desc); - } - if let Some(tz) = payload.timezone { - channel.timezone = tz; - } - if let Some(sc) = payload.schedule_config { - channel.schedule_config = sc; - } - if let Some(rp) = payload.recycle_policy { - channel.recycle_policy = rp; - } - channel.updated_at = Utc::now(); - - let channel = state.channel_service.update(channel).await?; - Ok(Json(ChannelResponse::from(channel))) -} - -async fn delete_channel( - State(state): State, - CurrentUser(user): CurrentUser, - Path(channel_id): Path, -) -> Result { - // ChannelService::delete enforces ownership internally - state.channel_service.delete(channel_id, user.id).await?; - Ok(StatusCode::NO_CONTENT) -} - -// ============================================================================ -// Schedule generation + retrieval -// ============================================================================ - -/// Trigger 48-hour schedule generation for a channel, starting from now. -/// Replaces any existing schedule for the same window. -async fn generate_schedule( - State(state): State, - CurrentUser(user): CurrentUser, - Path(channel_id): Path, -) -> Result { - let channel = state.channel_service.find_by_id(channel_id).await?; - require_owner(&channel, user.id)?; - - let schedule = state - .schedule_engine - .generate_schedule(channel_id, Utc::now()) - .await?; - - Ok((StatusCode::CREATED, Json(ScheduleResponse::from(schedule)))) -} - -/// Return the currently active 48-hour schedule for a channel. -/// 404 if no schedule has been generated yet — call POST /:id/schedule first. -async fn get_active_schedule( - State(state): State, - CurrentUser(user): CurrentUser, - Path(channel_id): Path, -) -> Result { - let channel = state.channel_service.find_by_id(channel_id).await?; - require_owner(&channel, user.id)?; - - let schedule = state - .schedule_engine - .get_active_schedule(channel_id, Utc::now()) - .await? - .ok_or(DomainError::NoActiveSchedule(channel_id))?; - - Ok(Json(ScheduleResponse::from(schedule))) -} - -// ============================================================================ -// Live broadcast endpoints -// ============================================================================ - -/// What is currently playing right now on this channel. -/// Returns 204 No Content when the channel is in a gap between blocks (no-signal). -async fn get_current_broadcast( - State(state): State, - Path(channel_id): Path, -) -> Result { - let _channel = state.channel_service.find_by_id(channel_id).await?; - - let now = Utc::now(); - let schedule = state - .schedule_engine - .get_active_schedule(channel_id, now) - .await? - .ok_or(DomainError::NoActiveSchedule(channel_id))?; - - match ScheduleEngineService::get_current_broadcast(&schedule, now) { - None => Ok(StatusCode::NO_CONTENT.into_response()), - Some(broadcast) => Ok(Json(CurrentBroadcastResponse { - slot: broadcast.slot.into(), - offset_secs: broadcast.offset_secs, - }) - .into_response()), - } -} - -/// EPG: return scheduled slots that overlap a time window. -/// -/// Query params (both RFC3339, both optional): -/// - `from` — start of window (default: now) -/// - `until` — end of window (default: now + 4 hours) -#[derive(Debug, Deserialize)] -struct EpgQuery { - from: Option, - until: Option, -} - -async fn get_epg( - State(state): State, - Path(channel_id): Path, - Query(params): Query, -) -> Result { - let _channel = state.channel_service.find_by_id(channel_id).await?; - - let now = Utc::now(); - let from = parse_optional_dt(params.from, now)?; - let until = parse_optional_dt(params.until, now + chrono::Duration::hours(4))?; - - if until <= from { - return Err(ApiError::validation("'until' must be after 'from'")); - } - - let schedule = state - .schedule_engine - .get_active_schedule(channel_id, from) - .await? - .ok_or(DomainError::NoActiveSchedule(channel_id))?; - - let slots: Vec = ScheduleEngineService::get_epg(&schedule, from, until) - .into_iter() - .cloned() - .map(Into::into) - .collect(); - - Ok(Json(slots)) -} - -/// Redirect to the stream URL for whatever is currently playing. -/// Returns 307 Temporary Redirect so the client fetches from the media provider directly. -/// Returns 204 No Content when the channel is in a gap (no-signal). -async fn get_stream( - State(state): State, - Path(channel_id): Path, -) -> Result { - let _channel = state.channel_service.find_by_id(channel_id).await?; - - let now = Utc::now(); - let schedule = state - .schedule_engine - .get_active_schedule(channel_id, now) - .await? - .ok_or(DomainError::NoActiveSchedule(channel_id))?; - - let broadcast = match ScheduleEngineService::get_current_broadcast(&schedule, now) { - None => return Ok(StatusCode::NO_CONTENT.into_response()), - Some(b) => b, - }; - - let url = state - .schedule_engine - .get_stream_url(&broadcast.slot.item.id) - .await?; - - Ok(Redirect::temporary(&url).into_response()) -} - -// ============================================================================ -// Helpers -// ============================================================================ - -fn require_owner(channel: &domain::Channel, user_id: Uuid) -> Result<(), ApiError> { - if channel.owner_id != user_id { - Err(ApiError::Forbidden("You don't own this channel".into())) - } else { - Ok(()) - } -} - -fn parse_optional_dt(s: Option, default: DateTime) -> Result, ApiError> { - match s { - None => Ok(default), - Some(raw) => DateTime::parse_from_rfc3339(&raw) - .map(|dt| dt.with_timezone(&Utc)) - .map_err(|_| ApiError::validation(format!("Invalid datetime '{}' — use RFC3339", raw))), - } -} diff --git a/k-tv-backend/api/src/routes/channels/broadcast.rs b/k-tv-backend/api/src/routes/channels/broadcast.rs new file mode 100644 index 0000000..6fb1f52 --- /dev/null +++ b/k-tv-backend/api/src/routes/channels/broadcast.rs @@ -0,0 +1,114 @@ +use axum::{ + Json, + extract::{Path, Query, State}, + http::StatusCode, + response::{IntoResponse, Redirect, Response}, +}; +use chrono::Utc; +use serde::Deserialize; +use uuid::Uuid; + +use domain::{DomainError, ScheduleEngineService}; + +use crate::{ + dto::{CurrentBroadcastResponse, ScheduledSlotResponse}, + error::ApiError, + state::AppState, +}; + +use super::parse_optional_dt; + +/// What is currently playing right now on this channel. +/// Returns 204 No Content when the channel is in a gap between blocks (no-signal). +pub(super) async fn get_current_broadcast( + State(state): State, + Path(channel_id): Path, +) -> Result { + let _channel = state.channel_service.find_by_id(channel_id).await?; + + let now = Utc::now(); + let schedule = state + .schedule_engine + .get_active_schedule(channel_id, now) + .await? + .ok_or(DomainError::NoActiveSchedule(channel_id))?; + + match ScheduleEngineService::get_current_broadcast(&schedule, now) { + None => Ok(StatusCode::NO_CONTENT.into_response()), + Some(broadcast) => Ok(Json(CurrentBroadcastResponse { + slot: broadcast.slot.into(), + offset_secs: broadcast.offset_secs, + }) + .into_response()), + } +} + +/// EPG: return scheduled slots that overlap a time window. +/// +/// Query params (both RFC3339, both optional): +/// - `from` — start of window (default: now) +/// - `until` — end of window (default: now + 4 hours) +#[derive(Debug, Deserialize)] +pub(super) struct EpgQuery { + from: Option, + until: Option, +} + +pub(super) async fn get_epg( + State(state): State, + Path(channel_id): Path, + Query(params): Query, +) -> Result { + let _channel = state.channel_service.find_by_id(channel_id).await?; + + let now = Utc::now(); + let from = parse_optional_dt(params.from, now)?; + let until = parse_optional_dt(params.until, now + chrono::Duration::hours(4))?; + + if until <= from { + return Err(ApiError::validation("'until' must be after 'from'")); + } + + let schedule = state + .schedule_engine + .get_active_schedule(channel_id, from) + .await? + .ok_or(DomainError::NoActiveSchedule(channel_id))?; + + let slots: Vec = ScheduleEngineService::get_epg(&schedule, from, until) + .into_iter() + .cloned() + .map(Into::into) + .collect(); + + Ok(Json(slots)) +} + +/// Redirect to the stream URL for whatever is currently playing. +/// Returns 307 Temporary Redirect so the client fetches from the media provider directly. +/// Returns 204 No Content when the channel is in a gap (no-signal). +pub(super) async fn get_stream( + State(state): State, + Path(channel_id): Path, +) -> Result { + let _channel = state.channel_service.find_by_id(channel_id).await?; + + let now = Utc::now(); + let schedule = state + .schedule_engine + .get_active_schedule(channel_id, now) + .await? + .ok_or(DomainError::NoActiveSchedule(channel_id))?; + + let broadcast = match ScheduleEngineService::get_current_broadcast(&schedule, now) { + None => return Ok(StatusCode::NO_CONTENT.into_response()), + Some(b) => b, + }; + + let url = state + .schedule_engine + .get_stream_url(&broadcast.slot.item.id) + .await?; + + Ok(Redirect::temporary(&url).into_response()) +} diff --git a/k-tv-backend/api/src/routes/channels/crud.rs b/k-tv-backend/api/src/routes/channels/crud.rs new file mode 100644 index 0000000..cae991c --- /dev/null +++ b/k-tv-backend/api/src/routes/channels/crud.rs @@ -0,0 +1,93 @@ +use axum::{ + Json, + extract::{Path, State}, + http::StatusCode, + response::IntoResponse, +}; +use chrono::Utc; +use uuid::Uuid; + +use crate::{ + dto::{ChannelResponse, CreateChannelRequest, UpdateChannelRequest}, + error::ApiError, + extractors::CurrentUser, + state::AppState, +}; + +use super::require_owner; + +pub(super) async fn list_channels( + State(state): State, +) -> Result { + let channels = state.channel_service.find_all().await?; + let response: Vec = channels.into_iter().map(Into::into).collect(); + Ok(Json(response)) +} + +pub(super) async fn create_channel( + State(state): State, + CurrentUser(user): CurrentUser, + Json(payload): Json, +) -> Result { + let mut channel = state + .channel_service + .create(user.id, &payload.name, &payload.timezone) + .await?; + + if let Some(desc) = payload.description { + channel.description = Some(desc); + channel = state.channel_service.update(channel).await?; + } + + Ok((StatusCode::CREATED, Json(ChannelResponse::from(channel)))) +} + +pub(super) async fn get_channel( + State(state): State, + CurrentUser(user): CurrentUser, + Path(channel_id): Path, +) -> Result { + let channel = state.channel_service.find_by_id(channel_id).await?; + require_owner(&channel, user.id)?; + Ok(Json(ChannelResponse::from(channel))) +} + +pub(super) async fn update_channel( + State(state): State, + CurrentUser(user): CurrentUser, + Path(channel_id): Path, + Json(payload): Json, +) -> Result { + let mut channel = state.channel_service.find_by_id(channel_id).await?; + require_owner(&channel, user.id)?; + + if let Some(name) = payload.name { + channel.name = name; + } + if let Some(desc) = payload.description { + channel.description = Some(desc); + } + if let Some(tz) = payload.timezone { + channel.timezone = tz; + } + if let Some(sc) = payload.schedule_config { + channel.schedule_config = sc; + } + if let Some(rp) = payload.recycle_policy { + channel.recycle_policy = rp; + } + channel.updated_at = Utc::now(); + + let channel = state.channel_service.update(channel).await?; + Ok(Json(ChannelResponse::from(channel))) +} + +pub(super) async fn delete_channel( + State(state): State, + CurrentUser(user): CurrentUser, + Path(channel_id): Path, +) -> Result { + // ChannelService::delete enforces ownership internally + state.channel_service.delete(channel_id, user.id).await?; + Ok(StatusCode::NO_CONTENT) +} diff --git a/k-tv-backend/api/src/routes/channels/mod.rs b/k-tv-backend/api/src/routes/channels/mod.rs new file mode 100644 index 0000000..a9b70b2 --- /dev/null +++ b/k-tv-backend/api/src/routes/channels/mod.rs @@ -0,0 +1,55 @@ +//! Channel routes +//! +//! CRUD + schedule generation require authentication (Bearer JWT). +//! Viewing endpoints (list, now, epg, stream) are intentionally public so the +//! TV page works without login. + +use axum::{Router, routing::{get, post}}; +use chrono::{DateTime, Utc}; +use uuid::Uuid; + +use crate::{error::ApiError, state::AppState}; + +mod broadcast; +mod crud; +mod schedule; + +pub fn router() -> Router { + Router::new() + .route("/", get(crud::list_channels).post(crud::create_channel)) + .route( + "/{id}", + get(crud::get_channel).put(crud::update_channel).delete(crud::delete_channel), + ) + .route( + "/{id}/schedule", + post(schedule::generate_schedule).get(schedule::get_active_schedule), + ) + .route("/{id}/now", get(broadcast::get_current_broadcast)) + .route("/{id}/epg", get(broadcast::get_epg)) + .route("/{id}/stream", get(broadcast::get_stream)) +} + +// ============================================================================ +// Shared helpers +// ============================================================================ + +pub(super) fn require_owner(channel: &domain::Channel, user_id: Uuid) -> Result<(), ApiError> { + if channel.owner_id != user_id { + Err(ApiError::Forbidden("You don't own this channel".into())) + } else { + Ok(()) + } +} + +pub(super) fn parse_optional_dt( + s: Option, + default: DateTime, +) -> Result, ApiError> { + match s { + None => Ok(default), + Some(raw) => DateTime::parse_from_rfc3339(&raw) + .map(|dt| dt.with_timezone(&Utc)) + .map_err(|_| ApiError::validation(format!("Invalid datetime '{}' — use RFC3339", raw))), + } +} diff --git a/k-tv-backend/api/src/routes/channels/schedule.rs b/k-tv-backend/api/src/routes/channels/schedule.rs new file mode 100644 index 0000000..4b1a567 --- /dev/null +++ b/k-tv-backend/api/src/routes/channels/schedule.rs @@ -0,0 +1,56 @@ +use axum::{ + Json, + extract::{Path, State}, + http::StatusCode, + response::IntoResponse, +}; +use chrono::Utc; +use uuid::Uuid; + +use domain::DomainError; + +use crate::{ + dto::ScheduleResponse, + error::ApiError, + extractors::CurrentUser, + state::AppState, +}; + +use super::require_owner; + +/// Trigger 48-hour schedule generation for a channel, starting from now. +/// Replaces any existing schedule for the same window. +pub(super) async fn generate_schedule( + State(state): State, + CurrentUser(user): CurrentUser, + Path(channel_id): Path, +) -> Result { + let channel = state.channel_service.find_by_id(channel_id).await?; + require_owner(&channel, user.id)?; + + let schedule = state + .schedule_engine + .generate_schedule(channel_id, Utc::now()) + .await?; + + Ok((StatusCode::CREATED, Json(ScheduleResponse::from(schedule)))) +} + +/// Return the currently active 48-hour schedule for a channel. +/// 404 if no schedule has been generated yet — call POST /:id/schedule first. +pub(super) async fn get_active_schedule( + State(state): State, + CurrentUser(user): CurrentUser, + Path(channel_id): Path, +) -> Result { + let channel = state.channel_service.find_by_id(channel_id).await?; + require_owner(&channel, user.id)?; + + let schedule = state + .schedule_engine + .get_active_schedule(channel_id, Utc::now()) + .await? + .ok_or(DomainError::NoActiveSchedule(channel_id))?; + + Ok(Json(ScheduleResponse::from(schedule))) +} diff --git a/k-tv-backend/domain/src/services/channel.rs b/k-tv-backend/domain/src/services/channel.rs new file mode 100644 index 0000000..04a56dd --- /dev/null +++ b/k-tv-backend/domain/src/services/channel.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use crate::entities::Channel; +use crate::errors::{DomainError, DomainResult}; +use crate::repositories::ChannelRepository; +use crate::value_objects::{ChannelId, UserId}; + +/// Service for managing channels (CRUD + ownership enforcement). +pub struct ChannelService { + channel_repo: Arc, +} + +impl ChannelService { + pub fn new(channel_repo: Arc) -> Self { + Self { channel_repo } + } + + pub async fn create( + &self, + owner_id: UserId, + name: &str, + timezone: &str, + ) -> DomainResult { + let channel = Channel::new(owner_id, name, timezone); + self.channel_repo.save(&channel).await?; + Ok(channel) + } + + pub async fn find_by_id(&self, id: ChannelId) -> DomainResult { + self.channel_repo + .find_by_id(id) + .await? + .ok_or(DomainError::ChannelNotFound(id)) + } + + pub async fn find_all(&self) -> DomainResult> { + self.channel_repo.find_all().await + } + + pub async fn find_by_owner(&self, owner_id: UserId) -> DomainResult> { + self.channel_repo.find_by_owner(owner_id).await + } + + pub async fn update(&self, channel: Channel) -> DomainResult { + self.channel_repo.save(&channel).await?; + Ok(channel) + } + + /// Delete a channel, enforcing that `requester_id` is the owner. + pub async fn delete(&self, id: ChannelId, requester_id: UserId) -> DomainResult<()> { + let channel = self.find_by_id(id).await?; + if channel.owner_id != requester_id { + return Err(DomainError::forbidden("You don't own this channel")); + } + self.channel_repo.delete(id).await + } +} diff --git a/k-tv-backend/domain/src/services/mod.rs b/k-tv-backend/domain/src/services/mod.rs new file mode 100644 index 0000000..3e567eb --- /dev/null +++ b/k-tv-backend/domain/src/services/mod.rs @@ -0,0 +1,11 @@ +//! Domain Services +//! +//! Services contain the business logic of the application. + +pub mod channel; +pub mod schedule; +pub mod user; + +pub use channel::ChannelService; +pub use schedule::ScheduleEngineService; +pub use user::UserService; diff --git a/k-tv-backend/domain/src/services/schedule/fill.rs b/k-tv-backend/domain/src/services/schedule/fill.rs new file mode 100644 index 0000000..a3dfa52 --- /dev/null +++ b/k-tv-backend/domain/src/services/schedule/fill.rs @@ -0,0 +1,119 @@ +use std::collections::HashSet; + +use rand::seq::SliceRandom; + +use crate::entities::MediaItem; +use crate::value_objects::{FillStrategy, MediaItemId}; + +pub(super) fn fill_block<'a>( + candidates: &'a [MediaItem], + pool: &'a [MediaItem], + target_secs: u32, + strategy: &FillStrategy, + last_item_id: Option<&MediaItemId>, +) -> Vec<&'a MediaItem> { + match strategy { + FillStrategy::BestFit => fill_best_fit(pool, target_secs), + FillStrategy::Sequential => fill_sequential(candidates, pool, target_secs, last_item_id), + FillStrategy::Random => { + let mut indices: Vec = (0..pool.len()).collect(); + indices.shuffle(&mut rand::thread_rng()); + let mut remaining = target_secs; + let mut result = Vec::new(); + for i in indices { + let item = &pool[i]; + if item.duration_secs <= remaining { + remaining -= item.duration_secs; + result.push(item); + } + } + result + } + } +} + +/// Greedy bin-packing: at each step pick the longest item that still fits +/// in the remaining budget, without repeating items within the same block. +pub(super) fn fill_best_fit(pool: &[MediaItem], target_secs: u32) -> Vec<&MediaItem> { + let mut remaining = target_secs; + let mut selected: Vec<&MediaItem> = Vec::new(); + let mut used: HashSet = HashSet::new(); + + loop { + let best = pool + .iter() + .enumerate() + .filter(|(idx, item)| { + !used.contains(idx) && item.duration_secs <= remaining + }) + .max_by_key(|(_, item)| item.duration_secs); + + match best { + Some((idx, item)) => { + remaining -= item.duration_secs; + used.insert(idx); + selected.push(item); + } + None => break, + } + } + + selected +} + +/// Sequential fill with cross-generation series continuity. +/// +/// `candidates` — all items matching the filter, in Jellyfin's natural order +/// (typically by season + episode number for TV shows). +/// `pool` — candidates filtered by the recycle policy (eligible to air). +/// `last_item_id` — the last item scheduled in this block in the previous +/// generation or in an earlier occurrence of this block within +/// the current generation. Used to resume the series from the +/// next episode rather than restarting from episode 1. +/// +/// Algorithm: +/// 1. Find `last_item_id`'s position in `candidates` and start from the next index. +/// 2. Walk the full `candidates` list in order (wrapping around at the end), +/// but only pick items that are in `pool` (i.e. not on cooldown). +/// 3. Greedily fill the time budget with items in that order. +/// +/// This ensures episodes always air in series order, the series wraps correctly +/// when the last episode has been reached, and cooldowns are still respected. +pub(super) fn fill_sequential<'a>( + candidates: &'a [MediaItem], + pool: &'a [MediaItem], + target_secs: u32, + last_item_id: Option<&MediaItemId>, +) -> Vec<&'a MediaItem> { + if pool.is_empty() { + return vec![]; + } + + // Set of item IDs currently eligible to air. + let available: HashSet<&MediaItemId> = pool.iter().map(|i| &i.id).collect(); + + // Find where in the full ordered list to resume. + // Falls back to index 0 if last_item_id is absent or was removed from the library. + let start_idx = last_item_id + .and_then(|id| candidates.iter().position(|c| &c.id == id)) + .map(|pos| (pos + 1) % candidates.len()) + .unwrap_or(0); + + // Walk candidates in order from start_idx, wrapping around once, + // skipping any that are on cooldown (not in `available`). + let ordered: Vec<&MediaItem> = (0..candidates.len()) + .map(|i| &candidates[(start_idx + i) % candidates.len()]) + .filter(|item| available.contains(&item.id)) + .collect(); + + // Greedily fill the block's time budget in episode order. + let mut remaining = target_secs; + let mut result = Vec::new(); + for item in ordered { + if item.duration_secs <= remaining { + remaining -= item.duration_secs; + result.push(item); + } + } + result +} diff --git a/k-tv-backend/domain/src/services.rs b/k-tv-backend/domain/src/services/schedule/mod.rs similarity index 51% rename from k-tv-backend/domain/src/services.rs rename to k-tv-backend/domain/src/services/schedule/mod.rs index ab1213d..d832f1e 100644 --- a/k-tv-backend/domain/src/services.rs +++ b/k-tv-backend/domain/src/services/schedule/mod.rs @@ -1,153 +1,23 @@ -//! Domain Services -//! -//! Services contain the business logic of the application. - -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use chrono::{DateTime, Duration, TimeZone, Utc}; use chrono_tz::Tz; -use rand::seq::SliceRandom; use uuid::Uuid; use crate::entities::{ - BlockContent, CurrentBroadcast, GeneratedSchedule, MediaItem, PlaybackRecord, - ProgrammingBlock, ScheduledSlot, + BlockContent, CurrentBroadcast, GeneratedSchedule, PlaybackRecord, ProgrammingBlock, + ScheduledSlot, }; use crate::errors::{DomainError, DomainResult}; use crate::ports::IMediaProvider; -use crate::repositories::{ChannelRepository, ScheduleRepository, UserRepository}; +use crate::repositories::{ChannelRepository, ScheduleRepository}; use crate::value_objects::{ - BlockId, ChannelId, Email, FillStrategy, MediaFilter, MediaItemId, RecyclePolicy, + BlockId, ChannelId, FillStrategy, MediaFilter, MediaItemId, RecyclePolicy, }; -// ============================================================================ -// UserService -// ============================================================================ - -/// Service for managing users. -pub struct UserService { - user_repository: Arc, -} - -impl UserService { - pub fn new(user_repository: Arc) -> Self { - Self { user_repository } - } - - pub async fn find_or_create(&self, subject: &str, email: &str) -> DomainResult { - if let Some(user) = self.user_repository.find_by_subject(subject).await? { - return Ok(user); - } - - if let Some(mut user) = self.user_repository.find_by_email(email).await? { - if user.subject != subject { - user.subject = subject.to_string(); - self.user_repository.save(&user).await?; - } - return Ok(user); - } - - let email = Email::try_from(email)?; - let user = crate::entities::User::new(subject, email); - self.user_repository.save(&user).await?; - Ok(user) - } - - pub async fn find_by_id(&self, id: Uuid) -> DomainResult { - self.user_repository - .find_by_id(id) - .await? - .ok_or(DomainError::UserNotFound(id)) - } - - pub async fn find_by_email(&self, email: &str) -> DomainResult> { - self.user_repository.find_by_email(email).await - } - - pub async fn create_local( - &self, - email: &str, - password_hash: &str, - ) -> DomainResult { - let email = Email::try_from(email)?; - let user = crate::entities::User::new_local(email, password_hash); - self.user_repository.save(&user).await?; - Ok(user) - } -} - -// ============================================================================ -// ChannelService -// ============================================================================ - -/// Service for managing channels (CRUD + ownership enforcement). -pub struct ChannelService { - channel_repo: Arc, -} - -impl ChannelService { - pub fn new(channel_repo: Arc) -> Self { - Self { channel_repo } - } - - pub async fn create( - &self, - owner_id: crate::value_objects::UserId, - name: &str, - timezone: &str, - ) -> DomainResult { - let channel = crate::entities::Channel::new(owner_id, name, timezone); - self.channel_repo.save(&channel).await?; - Ok(channel) - } - - pub async fn find_by_id( - &self, - id: ChannelId, - ) -> DomainResult { - self.channel_repo - .find_by_id(id) - .await? - .ok_or(DomainError::ChannelNotFound(id)) - } - - pub async fn find_all(&self) -> DomainResult> { - self.channel_repo.find_all().await - } - - pub async fn find_by_owner( - &self, - owner_id: crate::value_objects::UserId, - ) -> DomainResult> { - self.channel_repo.find_by_owner(owner_id).await - } - - pub async fn update( - &self, - channel: crate::entities::Channel, - ) -> DomainResult { - self.channel_repo.save(&channel).await?; - Ok(channel) - } - - /// Delete a channel, enforcing that `requester_id` is the owner. - pub async fn delete( - &self, - id: ChannelId, - requester_id: crate::value_objects::UserId, - ) -> DomainResult<()> { - let channel = self.find_by_id(id).await?; - if channel.owner_id != requester_id { - return Err(DomainError::forbidden("You don't own this channel")); - } - self.channel_repo.delete(id).await - } -} - -// ============================================================================ -// ScheduleEngineService -// ============================================================================ +mod fill; +mod recycle; /// Core scheduling engine. /// @@ -186,7 +56,7 @@ impl ScheduleEngineService { /// 3. Clip the interval to `[from, from + 48h)`. /// 4. Resolve the block content via the media provider, applying the recycle policy. /// 5. For `Sequential` blocks, resume from where the previous generation left off - /// (series continuity — see `fill_sequential`). + /// (series continuity — see `fill::fill_sequential`). /// 6. Record every played item in the playback history. /// /// Gaps between blocks are left empty — clients render them as a no-signal state. @@ -451,9 +321,9 @@ impl ScheduleEngineService { return Ok(vec![]); } - let pool = Self::apply_recycle_policy(&candidates, history, policy, generation); + let pool = recycle::apply_recycle_policy(&candidates, history, policy, generation); let target_secs = (end - start).num_seconds() as u32; - let selected = Self::fill_block(&candidates, &pool, target_secs, strategy, last_item_id); + let selected = fill::fill_block(&candidates, &pool, target_secs, strategy, last_item_id); let mut slots = Vec::new(); let mut cursor = start; @@ -476,178 +346,4 @@ impl ScheduleEngineService { Ok(slots) } - - // ------------------------------------------------------------------------- - // Recycle policy - // ------------------------------------------------------------------------- - - /// Filter `candidates` according to `policy`, returning the eligible pool. - /// - /// An item is on cooldown if *either* the day-based or generation-based - /// threshold is exceeded. If honouring all cooldowns would leave fewer items - /// than `policy.min_available_ratio` of the total, all cooldowns are waived - /// and the full pool is returned (prevents small libraries from stalling). - fn apply_recycle_policy( - candidates: &[MediaItem], - history: &[PlaybackRecord], - policy: &RecyclePolicy, - current_generation: u32, - ) -> Vec { - let now = Utc::now(); - - let excluded: HashSet = history - .iter() - .filter(|record| { - let by_days = policy - .cooldown_days - .map(|days| (now - record.played_at).num_days() < days as i64) - .unwrap_or(false); - - let by_gen = policy - .cooldown_generations - .map(|gens| { - current_generation.saturating_sub(record.generation) < gens - }) - .unwrap_or(false); - - by_days || by_gen - }) - .map(|r| r.item_id.clone()) - .collect(); - - let available: Vec = candidates - .iter() - .filter(|i| !excluded.contains(&i.id)) - .cloned() - .collect(); - - let min_count = - (candidates.len() as f32 * policy.min_available_ratio).ceil() as usize; - - if available.len() < min_count { - // Pool too small after applying cooldowns — recycle everything. - candidates.to_vec() - } else { - available - } - } - - // ------------------------------------------------------------------------- - // Fill strategies - // ------------------------------------------------------------------------- - - fn fill_block<'a>( - candidates: &'a [MediaItem], - pool: &'a [MediaItem], - target_secs: u32, - strategy: &FillStrategy, - last_item_id: Option<&MediaItemId>, - ) -> Vec<&'a MediaItem> { - match strategy { - FillStrategy::BestFit => Self::fill_best_fit(pool, target_secs), - FillStrategy::Sequential => { - Self::fill_sequential(candidates, pool, target_secs, last_item_id) - } - FillStrategy::Random => { - let mut indices: Vec = (0..pool.len()).collect(); - indices.shuffle(&mut rand::thread_rng()); - let mut remaining = target_secs; - let mut result = Vec::new(); - for i in indices { - let item = &pool[i]; - if item.duration_secs <= remaining { - remaining -= item.duration_secs; - result.push(item); - } - } - result - } - } - } - - /// Greedy bin-packing: at each step pick the longest item that still fits - /// in the remaining budget, without repeating items within the same block. - fn fill_best_fit(pool: &[MediaItem], target_secs: u32) -> Vec<&MediaItem> { - let mut remaining = target_secs; - let mut selected: Vec<&MediaItem> = Vec::new(); - let mut used: HashSet = HashSet::new(); - - loop { - let best = pool - .iter() - .enumerate() - .filter(|(idx, item)| { - !used.contains(idx) && item.duration_secs <= remaining - }) - .max_by_key(|(_, item)| item.duration_secs); - - match best { - Some((idx, item)) => { - remaining -= item.duration_secs; - used.insert(idx); - selected.push(item); - } - None => break, - } - } - - selected - } - - /// Sequential fill with cross-generation series continuity. - /// - /// `candidates` — all items matching the filter, in Jellyfin's natural order - /// (typically by season + episode number for TV shows). - /// `pool` — candidates filtered by the recycle policy (eligible to air). - /// `last_item_id` — the last item scheduled in this block in the previous - /// generation or in an earlier occurrence of this block within - /// the current generation. Used to resume the series from the - /// next episode rather than restarting from episode 1. - /// - /// Algorithm: - /// 1. Find `last_item_id`'s position in `candidates` and start from the next index. - /// 2. Walk the full `candidates` list in order (wrapping around at the end), - /// but only pick items that are in `pool` (i.e. not on cooldown). - /// 3. Greedily fill the time budget with items in that order. - /// - /// This ensures episodes always air in series order, the series wraps correctly - /// when the last episode has been reached, and cooldowns are still respected. - fn fill_sequential<'a>( - candidates: &'a [MediaItem], - pool: &'a [MediaItem], - target_secs: u32, - last_item_id: Option<&MediaItemId>, - ) -> Vec<&'a MediaItem> { - if pool.is_empty() { - return vec![]; - } - - // Set of item IDs currently eligible to air. - let available: HashSet<&MediaItemId> = pool.iter().map(|i| &i.id).collect(); - - // Find where in the full ordered list to resume. - // Falls back to index 0 if last_item_id is absent or was removed from the library. - let start_idx = last_item_id - .and_then(|id| candidates.iter().position(|c| &c.id == id)) - .map(|pos| (pos + 1) % candidates.len()) - .unwrap_or(0); - - // Walk candidates in order from start_idx, wrapping around once, - // skipping any that are on cooldown (not in `available`). - let ordered: Vec<&MediaItem> = (0..candidates.len()) - .map(|i| &candidates[(start_idx + i) % candidates.len()]) - .filter(|item| available.contains(&item.id)) - .collect(); - - // Greedily fill the block's time budget in episode order. - let mut remaining = target_secs; - let mut result = Vec::new(); - for item in ordered { - if item.duration_secs <= remaining { - remaining -= item.duration_secs; - result.push(item); - } - } - result - } } diff --git a/k-tv-backend/domain/src/services/schedule/recycle.rs b/k-tv-backend/domain/src/services/schedule/recycle.rs new file mode 100644 index 0000000..c29b6c3 --- /dev/null +++ b/k-tv-backend/domain/src/services/schedule/recycle.rs @@ -0,0 +1,55 @@ +use std::collections::HashSet; + +use chrono::Utc; + +use crate::entities::{MediaItem, PlaybackRecord}; +use crate::value_objects::{MediaItemId, RecyclePolicy}; + +/// Filter `candidates` according to `policy`, returning the eligible pool. +/// +/// An item is on cooldown if *either* the day-based or generation-based +/// threshold is exceeded. If honouring all cooldowns would leave fewer items +/// than `policy.min_available_ratio` of the total, all cooldowns are waived +/// and the full pool is returned (prevents small libraries from stalling). +pub(super) fn apply_recycle_policy( + candidates: &[MediaItem], + history: &[PlaybackRecord], + policy: &RecyclePolicy, + current_generation: u32, +) -> Vec { + let now = Utc::now(); + + let excluded: HashSet = history + .iter() + .filter(|record| { + let by_days = policy + .cooldown_days + .map(|days| (now - record.played_at).num_days() < days as i64) + .unwrap_or(false); + + let by_gen = policy + .cooldown_generations + .map(|gens| current_generation.saturating_sub(record.generation) < gens) + .unwrap_or(false); + + by_days || by_gen + }) + .map(|r| r.item_id.clone()) + .collect(); + + let available: Vec = candidates + .iter() + .filter(|i| !excluded.contains(&i.id)) + .cloned() + .collect(); + + let min_count = + (candidates.len() as f32 * policy.min_available_ratio).ceil() as usize; + + if available.len() < min_count { + // Pool too small after applying cooldowns — recycle everything. + candidates.to_vec() + } else { + available + } +} diff --git a/k-tv-backend/domain/src/services/user.rs b/k-tv-backend/domain/src/services/user.rs new file mode 100644 index 0000000..98b05e4 --- /dev/null +++ b/k-tv-backend/domain/src/services/user.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; + +use uuid::Uuid; + +use crate::entities::User; +use crate::errors::{DomainError, DomainResult}; +use crate::repositories::UserRepository; +use crate::value_objects::Email; + +/// Service for managing users. +pub struct UserService { + user_repository: Arc, +} + +impl UserService { + pub fn new(user_repository: Arc) -> Self { + Self { user_repository } + } + + pub async fn find_or_create(&self, subject: &str, email: &str) -> DomainResult { + if let Some(user) = self.user_repository.find_by_subject(subject).await? { + return Ok(user); + } + + if let Some(mut user) = self.user_repository.find_by_email(email).await? { + if user.subject != subject { + user.subject = subject.to_string(); + self.user_repository.save(&user).await?; + } + return Ok(user); + } + + let email = Email::try_from(email)?; + let user = User::new(subject, email); + self.user_repository.save(&user).await?; + Ok(user) + } + + pub async fn find_by_id(&self, id: Uuid) -> DomainResult { + self.user_repository + .find_by_id(id) + .await? + .ok_or(DomainError::UserNotFound(id)) + } + + pub async fn find_by_email(&self, email: &str) -> DomainResult> { + self.user_repository.find_by_email(email).await + } + + pub async fn create_local( + &self, + email: &str, + password_hash: &str, + ) -> DomainResult { + let email = Email::try_from(email)?; + let user = User::new_local(email, password_hash); + self.user_repository.save(&user).await?; + Ok(user) + } +} diff --git a/k-tv-backend/domain/src/value_objects/auth.rs b/k-tv-backend/domain/src/value_objects/auth.rs new file mode 100644 index 0000000..6d5c3ba --- /dev/null +++ b/k-tv-backend/domain/src/value_objects/auth.rs @@ -0,0 +1,227 @@ +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use std::fmt; +use thiserror::Error; + +// ============================================================================ +// Validation Error +// ============================================================================ + +/// Errors that occur when parsing/validating value objects +#[derive(Debug, Error, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub enum ValidationError { + #[error("Invalid email format: {0}")] + InvalidEmail(String), + + #[error("Password must be at least {min} characters, got {actual}")] + PasswordTooShort { min: usize, actual: usize }, + + #[error("Invalid URL: {0}")] + InvalidUrl(String), + + #[error("Value cannot be empty: {0}")] + Empty(String), + + #[error("Secret too short: minimum {min} bytes required, got {actual}")] + SecretTooShort { min: usize, actual: usize }, +} + +// ============================================================================ +// Email (using email_address crate for RFC-compliant validation) +// ============================================================================ + +/// A validated email address using RFC-compliant validation. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Email(email_address::EmailAddress); + +impl Email { + /// Create a new validated email address + pub fn new(value: impl AsRef) -> Result { + let value = value.as_ref().trim().to_lowercase(); + let addr: email_address::EmailAddress = value + .parse() + .map_err(|_| ValidationError::InvalidEmail(value.clone()))?; + Ok(Self(addr)) + } + + /// Get the inner value + pub fn into_inner(self) -> String { + self.0.to_string() + } +} + +impl AsRef for Email { + fn as_ref(&self) -> &str { + self.0.as_ref() + } +} + +impl fmt::Display for Email { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl TryFrom for Email { + type Error = ValidationError; + + fn try_from(value: String) -> Result { + Self::new(value) + } +} + +impl TryFrom<&str> for Email { + type Error = ValidationError; + + fn try_from(value: &str) -> Result { + Self::new(value) + } +} + +impl Serialize for Email { + fn serialize(&self, serializer: S) -> Result { + serializer.serialize_str(self.0.as_ref()) + } +} + +impl<'de> Deserialize<'de> for Email { + fn deserialize>(deserializer: D) -> Result { + let s = String::deserialize(deserializer)?; + Self::new(s).map_err(serde::de::Error::custom) + } +} + +// ============================================================================ +// Password +// ============================================================================ + +/// A validated password input (NOT the hash). +/// +/// Enforces minimum length of 6 characters. +#[derive(Clone, PartialEq, Eq)] +pub struct Password(String); + +/// Minimum password length (NIST recommendation) +pub const MIN_PASSWORD_LENGTH: usize = 8; + +impl Password { + pub fn new(value: impl Into) -> Result { + let value = value.into(); + + if value.len() < MIN_PASSWORD_LENGTH { + return Err(ValidationError::PasswordTooShort { + min: MIN_PASSWORD_LENGTH, + actual: value.len(), + }); + } + + Ok(Self(value)) + } + + pub fn into_inner(self) -> String { + self.0 + } +} + +impl AsRef for Password { + fn as_ref(&self) -> &str { + &self.0 + } +} + +// Intentionally hide password content in Debug +impl fmt::Debug for Password { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Password(***)") + } +} + +impl TryFrom for Password { + type Error = ValidationError; + + fn try_from(value: String) -> Result { + Self::new(value) + } +} + +impl TryFrom<&str> for Password { + type Error = ValidationError; + + fn try_from(value: &str) -> Result { + Self::new(value) + } +} + +impl<'de> Deserialize<'de> for Password { + fn deserialize>(deserializer: D) -> Result { + let s = String::deserialize(deserializer)?; + Self::new(s).map_err(serde::de::Error::custom) + } +} + +// Note: Password should NOT implement Serialize to prevent accidental exposure + +// ============================================================================ +// Tests +// ============================================================================ + +#[cfg(test)] +mod tests { + use super::*; + + mod email_tests { + use super::*; + + #[test] + fn test_valid_email() { + assert!(Email::new("user@example.com").is_ok()); + assert!(Email::new("USER@EXAMPLE.COM").is_ok()); // Should lowercase + assert!(Email::new(" user@example.com ").is_ok()); // Should trim + } + + #[test] + fn test_email_normalizes() { + let email = Email::new(" USER@EXAMPLE.COM ").unwrap(); + assert_eq!(email.as_ref(), "user@example.com"); + } + + #[test] + fn test_invalid_email_no_at() { + assert!(Email::new("userexample.com").is_err()); + } + + #[test] + fn test_invalid_email_no_domain() { + assert!(Email::new("user@").is_err()); + } + + #[test] + fn test_invalid_email_no_local() { + assert!(Email::new("@example.com").is_err()); + } + } + + mod password_tests { + use super::*; + + #[test] + fn test_valid_password() { + assert!(Password::new("secret123").is_ok()); + assert!(Password::new("12345678").is_ok()); // Exactly 8 chars + } + + #[test] + fn test_password_too_short() { + assert!(Password::new("1234567").is_err()); // 7 chars + assert!(Password::new("").is_err()); + } + + #[test] + fn test_password_debug_hides_content() { + let password = Password::new("supersecret").unwrap(); + let debug = format!("{:?}", password); + assert!(!debug.contains("supersecret")); + assert!(debug.contains("***")); + } + } +} diff --git a/k-tv-backend/domain/src/value_objects/ids.rs b/k-tv-backend/domain/src/value_objects/ids.rs new file mode 100644 index 0000000..dc253b2 --- /dev/null +++ b/k-tv-backend/domain/src/value_objects/ids.rs @@ -0,0 +1,6 @@ +use uuid::Uuid; + +pub type UserId = Uuid; +pub type ChannelId = Uuid; +pub type SlotId = Uuid; +pub type BlockId = Uuid; diff --git a/k-tv-backend/domain/src/value_objects/mod.rs b/k-tv-backend/domain/src/value_objects/mod.rs new file mode 100644 index 0000000..cf2cfd7 --- /dev/null +++ b/k-tv-backend/domain/src/value_objects/mod.rs @@ -0,0 +1,14 @@ +//! Value Objects for K-Notes Domain +//! +//! Newtypes that encapsulate validation logic, following the "parse, don't validate" pattern. +//! These types can only be constructed if the input is valid, providing compile-time guarantees. + +pub mod auth; +pub mod ids; +pub mod oidc; +pub mod scheduling; + +pub use auth::*; +pub use ids::*; +pub use oidc::*; +pub use scheduling::*; diff --git a/k-tv-backend/domain/src/value_objects.rs b/k-tv-backend/domain/src/value_objects/oidc.rs similarity index 50% rename from k-tv-backend/domain/src/value_objects.rs rename to k-tv-backend/domain/src/value_objects/oidc.rs index 19e806d..3203502 100644 --- a/k-tv-backend/domain/src/value_objects.rs +++ b/k-tv-backend/domain/src/value_objects/oidc.rs @@ -1,174 +1,8 @@ -//! Value Objects for K-Notes Domain -//! -//! Newtypes that encapsulate validation logic, following the "parse, don't validate" pattern. -//! These types can only be constructed if the input is valid, providing compile-time guarantees. - -use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use serde::{Deserialize, Deserializer, Serialize}; use std::fmt; -use thiserror::Error; use url::Url; -use uuid::Uuid; -pub type UserId = Uuid; - -// ============================================================================ -// Validation Error -// ============================================================================ - -/// Errors that occur when parsing/validating value objects -#[derive(Debug, Error, Clone, PartialEq, Eq)] -#[non_exhaustive] -pub enum ValidationError { - #[error("Invalid email format: {0}")] - InvalidEmail(String), - - #[error("Password must be at least {min} characters, got {actual}")] - PasswordTooShort { min: usize, actual: usize }, - - #[error("Invalid URL: {0}")] - InvalidUrl(String), - - #[error("Value cannot be empty: {0}")] - Empty(String), - - #[error("Secret too short: minimum {min} bytes required, got {actual}")] - SecretTooShort { min: usize, actual: usize }, -} - -// ============================================================================ -// Email (using email_address crate for RFC-compliant validation) -// ============================================================================ - -/// A validated email address using RFC-compliant validation. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct Email(email_address::EmailAddress); - -impl Email { - /// Create a new validated email address - pub fn new(value: impl AsRef) -> Result { - let value = value.as_ref().trim().to_lowercase(); - let addr: email_address::EmailAddress = value - .parse() - .map_err(|_| ValidationError::InvalidEmail(value.clone()))?; - Ok(Self(addr)) - } - - /// Get the inner value - pub fn into_inner(self) -> String { - self.0.to_string() - } -} - -impl AsRef for Email { - fn as_ref(&self) -> &str { - self.0.as_ref() - } -} - -impl fmt::Display for Email { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl TryFrom for Email { - type Error = ValidationError; - - fn try_from(value: String) -> Result { - Self::new(value) - } -} - -impl TryFrom<&str> for Email { - type Error = ValidationError; - - fn try_from(value: &str) -> Result { - Self::new(value) - } -} - -impl Serialize for Email { - fn serialize(&self, serializer: S) -> Result { - serializer.serialize_str(self.0.as_ref()) - } -} - -impl<'de> Deserialize<'de> for Email { - fn deserialize>(deserializer: D) -> Result { - let s = String::deserialize(deserializer)?; - Self::new(s).map_err(serde::de::Error::custom) - } -} - -// ============================================================================ -// Password -// ============================================================================ - -/// A validated password input (NOT the hash). -/// -/// Enforces minimum length of 6 characters. -#[derive(Clone, PartialEq, Eq)] -pub struct Password(String); - -/// Minimum password length (NIST recommendation) -pub const MIN_PASSWORD_LENGTH: usize = 8; - -impl Password { - pub fn new(value: impl Into) -> Result { - let value = value.into(); - - if value.len() < MIN_PASSWORD_LENGTH { - return Err(ValidationError::PasswordTooShort { - min: MIN_PASSWORD_LENGTH, - actual: value.len(), - }); - } - - Ok(Self(value)) - } - - pub fn into_inner(self) -> String { - self.0 - } -} - -impl AsRef for Password { - fn as_ref(&self) -> &str { - &self.0 - } -} - -// Intentionally hide password content in Debug -impl fmt::Debug for Password { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Password(***)") - } -} - -impl TryFrom for Password { - type Error = ValidationError; - - fn try_from(value: String) -> Result { - Self::new(value) - } -} - -impl TryFrom<&str> for Password { - type Error = ValidationError; - - fn try_from(value: &str) -> Result { - Self::new(value) - } -} - -impl<'de> Deserialize<'de> for Password { - fn deserialize>(deserializer: D) -> Result { - let s = String::deserialize(deserializer)?; - Self::new(s).map_err(serde::de::Error::custom) - } -} - -// Note: Password should NOT implement Serialize to prevent accidental exposure +use super::auth::ValidationError; // ============================================================================ // OIDC Configuration Newtypes @@ -534,130 +368,6 @@ impl fmt::Debug for JwtSecret { } } -// ============================================================================ -// Channel / Schedule types -// ============================================================================ - -pub type ChannelId = Uuid; -pub type SlotId = Uuid; -pub type BlockId = Uuid; - -/// Opaque media item identifier — format is provider-specific internally. -/// The domain never inspects the string; it just passes it back to the provider. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct MediaItemId(String); - -impl MediaItemId { - pub fn new(value: impl Into) -> Self { - Self(value.into()) - } - - pub fn into_inner(self) -> String { - self.0 - } -} - -impl AsRef for MediaItemId { - fn as_ref(&self) -> &str { - &self.0 - } -} - -impl fmt::Display for MediaItemId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl From for MediaItemId { - fn from(s: String) -> Self { - Self(s) - } -} - -impl From<&str> for MediaItemId { - fn from(s: &str) -> Self { - Self(s.to_string()) - } -} - -/// The broad category of a media item. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum ContentType { - Movie, - Episode, - Short, -} - -/// Provider-agnostic filter for querying media items. -/// -/// Each field is optional — omitting it means "no constraint on this dimension". -/// The `IMediaProvider` adapter interprets these fields in terms of its own API. -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -pub struct MediaFilter { - pub content_type: Option, - pub genres: Vec, - /// Starting year of a decade: 1990 means 1990–1999. - pub decade: Option, - pub tags: Vec, - pub min_duration_secs: Option, - pub max_duration_secs: Option, - /// Abstract groupings interpreted by each provider (Jellyfin library, Plex section, - /// filesystem path, etc.). An empty list means "all available content". - pub collections: Vec, - /// Filter to one or more TV series by name. Use with `content_type: Episode`. - /// With `Sequential` strategy each series plays in chronological order. - /// Multiple series are OR-combined: any episode from any listed show is eligible. - #[serde(default)] - pub series_names: Vec, - /// Free-text search term. Intended for library browsing; typically omitted - /// during schedule generation. - pub search_term: Option, -} - -/// How the scheduling engine fills a time block with selected media items. -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum FillStrategy { - /// Greedy bin-packing: at each step pick the longest item that still fits, - /// minimising dead air. Good for variety blocks. - BestFit, - /// Pick items in the order returned by the provider — ideal for series - /// where episode sequence matters. - Sequential, - /// Shuffle the pool randomly then fill sequentially. Good for "shuffle play" channels. - Random, -} - -/// Controls when previously aired items become eligible to play again. -/// -/// An item is *on cooldown* if *either* threshold is met. -/// `min_available_ratio` is a safety valve: if honouring the cooldown would -/// leave fewer items than this fraction of the total pool, the cooldown is -/// ignored and all items become eligible. This prevents small libraries from -/// running completely dry. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RecyclePolicy { - /// Do not replay an item within this many calendar days. - pub cooldown_days: Option, - /// Do not replay an item within this many schedule generations. - pub cooldown_generations: Option, - /// Always keep at least this fraction (0.0–1.0) of the matching pool - /// available for selection, even if their cooldown has not yet expired. - pub min_available_ratio: f32, -} - -impl Default for RecyclePolicy { - fn default() -> Self { - Self { - cooldown_days: Some(30), - cooldown_generations: None, - min_available_ratio: 0.2, - } - } -} - // ============================================================================ // Tests // ============================================================================ @@ -666,62 +376,6 @@ impl Default for RecyclePolicy { mod tests { use super::*; - mod email_tests { - use super::*; - - #[test] - fn test_valid_email() { - assert!(Email::new("user@example.com").is_ok()); - assert!(Email::new("USER@EXAMPLE.COM").is_ok()); // Should lowercase - assert!(Email::new(" user@example.com ").is_ok()); // Should trim - } - - #[test] - fn test_email_normalizes() { - let email = Email::new(" USER@EXAMPLE.COM ").unwrap(); - assert_eq!(email.as_ref(), "user@example.com"); - } - - #[test] - fn test_invalid_email_no_at() { - assert!(Email::new("userexample.com").is_err()); - } - - #[test] - fn test_invalid_email_no_domain() { - assert!(Email::new("user@").is_err()); - } - - #[test] - fn test_invalid_email_no_local() { - assert!(Email::new("@example.com").is_err()); - } - } - - mod password_tests { - use super::*; - - #[test] - fn test_valid_password() { - assert!(Password::new("secret123").is_ok()); - assert!(Password::new("12345678").is_ok()); // Exactly 8 chars - } - - #[test] - fn test_password_too_short() { - assert!(Password::new("1234567").is_err()); // 7 chars - assert!(Password::new("").is_err()); - } - - #[test] - fn test_password_debug_hides_content() { - let password = Password::new("supersecret").unwrap(); - let debug = format!("{:?}", password); - assert!(!debug.contains("supersecret")); - assert!(debug.contains("***")); - } - } - mod oidc_tests { use super::*; diff --git a/k-tv-backend/domain/src/value_objects/scheduling.rs b/k-tv-backend/domain/src/value_objects/scheduling.rs new file mode 100644 index 0000000..bb0457d --- /dev/null +++ b/k-tv-backend/domain/src/value_objects/scheduling.rs @@ -0,0 +1,118 @@ +use serde::{Deserialize, Serialize}; +use std::fmt; + +/// Opaque media item identifier — format is provider-specific internally. +/// The domain never inspects the string; it just passes it back to the provider. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct MediaItemId(String); + +impl MediaItemId { + pub fn new(value: impl Into) -> Self { + Self(value.into()) + } + + pub fn into_inner(self) -> String { + self.0 + } +} + +impl AsRef for MediaItemId { + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for MediaItemId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl From for MediaItemId { + fn from(s: String) -> Self { + Self(s) + } +} + +impl From<&str> for MediaItemId { + fn from(s: &str) -> Self { + Self(s.to_string()) + } +} + +/// The broad category of a media item. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ContentType { + Movie, + Episode, + Short, +} + +/// Provider-agnostic filter for querying media items. +/// +/// Each field is optional — omitting it means "no constraint on this dimension". +/// The `IMediaProvider` adapter interprets these fields in terms of its own API. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct MediaFilter { + pub content_type: Option, + pub genres: Vec, + /// Starting year of a decade: 1990 means 1990–1999. + pub decade: Option, + pub tags: Vec, + pub min_duration_secs: Option, + pub max_duration_secs: Option, + /// Abstract groupings interpreted by each provider (Jellyfin library, Plex section, + /// filesystem path, etc.). An empty list means "all available content". + pub collections: Vec, + /// Filter to one or more TV series by name. Use with `content_type: Episode`. + /// With `Sequential` strategy each series plays in chronological order. + /// Multiple series are OR-combined: any episode from any listed show is eligible. + #[serde(default)] + pub series_names: Vec, + /// Free-text search term. Intended for library browsing; typically omitted + /// during schedule generation. + pub search_term: Option, +} + +/// How the scheduling engine fills a time block with selected media items. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum FillStrategy { + /// Greedy bin-packing: at each step pick the longest item that still fits, + /// minimising dead air. Good for variety blocks. + BestFit, + /// Pick items in the order returned by the provider — ideal for series + /// where episode sequence matters. + Sequential, + /// Shuffle the pool randomly then fill sequentially. Good for "shuffle play" channels. + Random, +} + +/// Controls when previously aired items become eligible to play again. +/// +/// An item is *on cooldown* if *either* threshold is met. +/// `min_available_ratio` is a safety valve: if honouring the cooldown would +/// leave fewer items than this fraction of the total pool, the cooldown is +/// ignored and all items become eligible. This prevents small libraries from +/// running completely dry. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RecyclePolicy { + /// Do not replay an item within this many calendar days. + pub cooldown_days: Option, + /// Do not replay an item within this many schedule generations. + pub cooldown_generations: Option, + /// Always keep at least this fraction (0.0–1.0) of the matching pool + /// available for selection, even if their cooldown has not yet expired. + pub min_available_ratio: f32, +} + +impl Default for RecyclePolicy { + fn default() -> Self { + Self { + cooldown_days: Some(30), + cooldown_generations: None, + min_available_ratio: 0.2, + } + } +} diff --git a/k-tv-backend/infra/src/channel_repository.rs b/k-tv-backend/infra/src/channel_repository.rs deleted file mode 100644 index 964311f..0000000 --- a/k-tv-backend/infra/src/channel_repository.rs +++ /dev/null @@ -1,275 +0,0 @@ -//! SQLite and PostgreSQL adapters for ChannelRepository - -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use sqlx::FromRow; -use uuid::Uuid; - -use domain::{ - Channel, ChannelId, ChannelRepository, DomainError, DomainResult, RecyclePolicy, - ScheduleConfig, UserId, -}; - -// ============================================================================ -// Row type + mapping (shared between SQLite and Postgres) -// ============================================================================ - -#[derive(Debug, FromRow)] -struct ChannelRow { - id: String, - owner_id: String, - name: String, - description: Option, - timezone: String, - schedule_config: String, - recycle_policy: String, - created_at: String, - updated_at: String, -} - -fn parse_dt(s: &str) -> Result, DomainError> { - DateTime::parse_from_rfc3339(s) - .map(|dt| dt.with_timezone(&Utc)) - .or_else(|_| { - chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map(|dt| dt.and_utc()) - }) - .map_err(|e| DomainError::RepositoryError(format!("Invalid datetime '{}': {}", s, e))) -} - -impl TryFrom for Channel { - type Error = DomainError; - - fn try_from(row: ChannelRow) -> Result { - let id: ChannelId = Uuid::parse_str(&row.id) - .map_err(|e| DomainError::RepositoryError(format!("Invalid channel UUID: {}", e)))?; - let owner_id: UserId = Uuid::parse_str(&row.owner_id) - .map_err(|e| DomainError::RepositoryError(format!("Invalid owner UUID: {}", e)))?; - let schedule_config: ScheduleConfig = serde_json::from_str(&row.schedule_config) - .map_err(|e| { - DomainError::RepositoryError(format!("Invalid schedule_config JSON: {}", e)) - })?; - let recycle_policy: RecyclePolicy = serde_json::from_str(&row.recycle_policy) - .map_err(|e| { - DomainError::RepositoryError(format!("Invalid recycle_policy JSON: {}", e)) - })?; - - Ok(Channel { - id, - owner_id, - name: row.name, - description: row.description, - timezone: row.timezone, - schedule_config, - recycle_policy, - created_at: parse_dt(&row.created_at)?, - updated_at: parse_dt(&row.updated_at)?, - }) - } -} - -const SELECT_COLS: &str = - "id, owner_id, name, description, timezone, schedule_config, recycle_policy, created_at, updated_at"; - -// ============================================================================ -// SQLite adapter -// ============================================================================ - -#[cfg(feature = "sqlite")] -pub struct SqliteChannelRepository { - pool: sqlx::SqlitePool, -} - -#[cfg(feature = "sqlite")] -impl SqliteChannelRepository { - pub fn new(pool: sqlx::SqlitePool) -> Self { - Self { pool } - } -} - -#[cfg(feature = "sqlite")] -#[async_trait] -impl ChannelRepository for SqliteChannelRepository { - async fn find_by_id(&self, id: ChannelId) -> DomainResult> { - let sql = format!("SELECT {SELECT_COLS} FROM channels WHERE id = ?"); - let row: Option = sqlx::query_as(&sql) - .bind(id.to_string()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - row.map(Channel::try_from).transpose() - } - - async fn find_by_owner(&self, owner_id: UserId) -> DomainResult> { - let sql = format!( - "SELECT {SELECT_COLS} FROM channels WHERE owner_id = ? ORDER BY created_at ASC" - ); - let rows: Vec = sqlx::query_as(&sql) - .bind(owner_id.to_string()) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - rows.into_iter().map(Channel::try_from).collect() - } - - async fn find_all(&self) -> DomainResult> { - let sql = format!("SELECT {SELECT_COLS} FROM channels ORDER BY created_at ASC"); - let rows: Vec = sqlx::query_as(&sql) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - rows.into_iter().map(Channel::try_from).collect() - } - - async fn save(&self, channel: &Channel) -> DomainResult<()> { - let schedule_config = serde_json::to_string(&channel.schedule_config).map_err(|e| { - DomainError::RepositoryError(format!("Failed to serialize schedule_config: {}", e)) - })?; - let recycle_policy = serde_json::to_string(&channel.recycle_policy).map_err(|e| { - DomainError::RepositoryError(format!("Failed to serialize recycle_policy: {}", e)) - })?; - - sqlx::query( - r#" - INSERT INTO channels - (id, owner_id, name, description, timezone, schedule_config, recycle_policy, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(id) DO UPDATE SET - name = excluded.name, - description = excluded.description, - timezone = excluded.timezone, - schedule_config = excluded.schedule_config, - recycle_policy = excluded.recycle_policy, - updated_at = excluded.updated_at - "#, - ) - .bind(channel.id.to_string()) - .bind(channel.owner_id.to_string()) - .bind(&channel.name) - .bind(&channel.description) - .bind(&channel.timezone) - .bind(&schedule_config) - .bind(&recycle_policy) - .bind(channel.created_at.to_rfc3339()) - .bind(channel.updated_at.to_rfc3339()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - Ok(()) - } - - async fn delete(&self, id: ChannelId) -> DomainResult<()> { - sqlx::query("DELETE FROM channels WHERE id = ?") - .bind(id.to_string()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - Ok(()) - } -} - -// ============================================================================ -// PostgreSQL adapter -// ============================================================================ - -#[cfg(feature = "postgres")] -pub struct PostgresChannelRepository { - pool: sqlx::Pool, -} - -#[cfg(feature = "postgres")] -impl PostgresChannelRepository { - pub fn new(pool: sqlx::Pool) -> Self { - Self { pool } - } -} - -#[cfg(feature = "postgres")] -#[async_trait] -impl ChannelRepository for PostgresChannelRepository { - async fn find_by_id(&self, id: ChannelId) -> DomainResult> { - let sql = format!("SELECT {SELECT_COLS} FROM channels WHERE id = $1"); - let row: Option = sqlx::query_as(&sql) - .bind(id.to_string()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - row.map(Channel::try_from).transpose() - } - - async fn find_by_owner(&self, owner_id: UserId) -> DomainResult> { - let sql = format!( - "SELECT {SELECT_COLS} FROM channels WHERE owner_id = $1 ORDER BY created_at ASC" - ); - let rows: Vec = sqlx::query_as(&sql) - .bind(owner_id.to_string()) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - rows.into_iter().map(Channel::try_from).collect() - } - - async fn find_all(&self) -> DomainResult> { - let sql = format!("SELECT {SELECT_COLS} FROM channels ORDER BY created_at ASC"); - let rows: Vec = sqlx::query_as(&sql) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - rows.into_iter().map(Channel::try_from).collect() - } - - async fn save(&self, channel: &Channel) -> DomainResult<()> { - let schedule_config = serde_json::to_string(&channel.schedule_config).map_err(|e| { - DomainError::RepositoryError(format!("Failed to serialize schedule_config: {}", e)) - })?; - let recycle_policy = serde_json::to_string(&channel.recycle_policy).map_err(|e| { - DomainError::RepositoryError(format!("Failed to serialize recycle_policy: {}", e)) - })?; - - sqlx::query( - r#" - INSERT INTO channels - (id, owner_id, name, description, timezone, schedule_config, recycle_policy, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT(id) DO UPDATE SET - name = EXCLUDED.name, - description = EXCLUDED.description, - timezone = EXCLUDED.timezone, - schedule_config = EXCLUDED.schedule_config, - recycle_policy = EXCLUDED.recycle_policy, - updated_at = EXCLUDED.updated_at - "#, - ) - .bind(channel.id.to_string()) - .bind(channel.owner_id.to_string()) - .bind(&channel.name) - .bind(&channel.description) - .bind(&channel.timezone) - .bind(&schedule_config) - .bind(&recycle_policy) - .bind(channel.created_at.to_rfc3339()) - .bind(channel.updated_at.to_rfc3339()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - Ok(()) - } - - async fn delete(&self, id: ChannelId) -> DomainResult<()> { - sqlx::query("DELETE FROM channels WHERE id = $1") - .bind(id.to_string()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - Ok(()) - } -} diff --git a/k-tv-backend/infra/src/channel_repository/mapping.rs b/k-tv-backend/infra/src/channel_repository/mapping.rs new file mode 100644 index 0000000..b1410a0 --- /dev/null +++ b/k-tv-backend/infra/src/channel_repository/mapping.rs @@ -0,0 +1,61 @@ +use chrono::{DateTime, Utc}; +use sqlx::FromRow; +use uuid::Uuid; + +use domain::{Channel, ChannelId, DomainError, RecyclePolicy, ScheduleConfig, UserId}; + +#[derive(Debug, FromRow)] +pub(super) struct ChannelRow { + pub id: String, + pub owner_id: String, + pub name: String, + pub description: Option, + pub timezone: String, + pub schedule_config: String, + pub recycle_policy: String, + pub created_at: String, + pub updated_at: String, +} + +pub(super) fn parse_dt(s: &str) -> Result, DomainError> { + DateTime::parse_from_rfc3339(s) + .map(|dt| dt.with_timezone(&Utc)) + .or_else(|_| { + chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map(|dt| dt.and_utc()) + }) + .map_err(|e| DomainError::RepositoryError(format!("Invalid datetime '{}': {}", s, e))) +} + +impl TryFrom for Channel { + type Error = DomainError; + + fn try_from(row: ChannelRow) -> Result { + let id: ChannelId = Uuid::parse_str(&row.id) + .map_err(|e| DomainError::RepositoryError(format!("Invalid channel UUID: {}", e)))?; + let owner_id: UserId = Uuid::parse_str(&row.owner_id) + .map_err(|e| DomainError::RepositoryError(format!("Invalid owner UUID: {}", e)))?; + let schedule_config: ScheduleConfig = serde_json::from_str(&row.schedule_config) + .map_err(|e| { + DomainError::RepositoryError(format!("Invalid schedule_config JSON: {}", e)) + })?; + let recycle_policy: RecyclePolicy = serde_json::from_str(&row.recycle_policy) + .map_err(|e| { + DomainError::RepositoryError(format!("Invalid recycle_policy JSON: {}", e)) + })?; + + Ok(Channel { + id, + owner_id, + name: row.name, + description: row.description, + timezone: row.timezone, + schedule_config, + recycle_policy, + created_at: parse_dt(&row.created_at)?, + updated_at: parse_dt(&row.updated_at)?, + }) + } +} + +pub(super) const SELECT_COLS: &str = + "id, owner_id, name, description, timezone, schedule_config, recycle_policy, created_at, updated_at"; diff --git a/k-tv-backend/infra/src/channel_repository/mod.rs b/k-tv-backend/infra/src/channel_repository/mod.rs new file mode 100644 index 0000000..6a4de63 --- /dev/null +++ b/k-tv-backend/infra/src/channel_repository/mod.rs @@ -0,0 +1,13 @@ +//! SQLite and PostgreSQL adapters for ChannelRepository + +mod mapping; + +#[cfg(feature = "sqlite")] +mod sqlite; +#[cfg(feature = "postgres")] +mod postgres; + +#[cfg(feature = "sqlite")] +pub use sqlite::SqliteChannelRepository; +#[cfg(feature = "postgres")] +pub use postgres::PostgresChannelRepository; diff --git a/k-tv-backend/infra/src/channel_repository/postgres.rs b/k-tv-backend/infra/src/channel_repository/postgres.rs new file mode 100644 index 0000000..a28fa1e --- /dev/null +++ b/k-tv-backend/infra/src/channel_repository/postgres.rs @@ -0,0 +1,100 @@ +use async_trait::async_trait; + +use domain::{Channel, ChannelId, ChannelRepository, DomainError, DomainResult, UserId}; + +use super::mapping::{ChannelRow, SELECT_COLS}; + +pub struct PostgresChannelRepository { + pool: sqlx::Pool, +} + +impl PostgresChannelRepository { + pub fn new(pool: sqlx::Pool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl ChannelRepository for PostgresChannelRepository { + async fn find_by_id(&self, id: ChannelId) -> DomainResult> { + let sql = format!("SELECT {SELECT_COLS} FROM channels WHERE id = $1"); + let row: Option = sqlx::query_as(&sql) + .bind(id.to_string()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + row.map(Channel::try_from).transpose() + } + + async fn find_by_owner(&self, owner_id: UserId) -> DomainResult> { + let sql = format!( + "SELECT {SELECT_COLS} FROM channels WHERE owner_id = $1 ORDER BY created_at ASC" + ); + let rows: Vec = sqlx::query_as(&sql) + .bind(owner_id.to_string()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + rows.into_iter().map(Channel::try_from).collect() + } + + async fn find_all(&self) -> DomainResult> { + let sql = format!("SELECT {SELECT_COLS} FROM channels ORDER BY created_at ASC"); + let rows: Vec = sqlx::query_as(&sql) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + rows.into_iter().map(Channel::try_from).collect() + } + + async fn save(&self, channel: &Channel) -> DomainResult<()> { + let schedule_config = serde_json::to_string(&channel.schedule_config).map_err(|e| { + DomainError::RepositoryError(format!("Failed to serialize schedule_config: {}", e)) + })?; + let recycle_policy = serde_json::to_string(&channel.recycle_policy).map_err(|e| { + DomainError::RepositoryError(format!("Failed to serialize recycle_policy: {}", e)) + })?; + + sqlx::query( + r#" + INSERT INTO channels + (id, owner_id, name, description, timezone, schedule_config, recycle_policy, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT(id) DO UPDATE SET + name = EXCLUDED.name, + description = EXCLUDED.description, + timezone = EXCLUDED.timezone, + schedule_config = EXCLUDED.schedule_config, + recycle_policy = EXCLUDED.recycle_policy, + updated_at = EXCLUDED.updated_at + "#, + ) + .bind(channel.id.to_string()) + .bind(channel.owner_id.to_string()) + .bind(&channel.name) + .bind(&channel.description) + .bind(&channel.timezone) + .bind(&schedule_config) + .bind(&recycle_policy) + .bind(channel.created_at.to_rfc3339()) + .bind(channel.updated_at.to_rfc3339()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + Ok(()) + } + + async fn delete(&self, id: ChannelId) -> DomainResult<()> { + sqlx::query("DELETE FROM channels WHERE id = $1") + .bind(id.to_string()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + Ok(()) + } +} diff --git a/k-tv-backend/infra/src/channel_repository/sqlite.rs b/k-tv-backend/infra/src/channel_repository/sqlite.rs new file mode 100644 index 0000000..f6e5398 --- /dev/null +++ b/k-tv-backend/infra/src/channel_repository/sqlite.rs @@ -0,0 +1,100 @@ +use async_trait::async_trait; + +use domain::{Channel, ChannelId, ChannelRepository, DomainError, DomainResult, UserId}; + +use super::mapping::{ChannelRow, SELECT_COLS}; + +pub struct SqliteChannelRepository { + pool: sqlx::SqlitePool, +} + +impl SqliteChannelRepository { + pub fn new(pool: sqlx::SqlitePool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl ChannelRepository for SqliteChannelRepository { + async fn find_by_id(&self, id: ChannelId) -> DomainResult> { + let sql = format!("SELECT {SELECT_COLS} FROM channels WHERE id = ?"); + let row: Option = sqlx::query_as(&sql) + .bind(id.to_string()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + row.map(Channel::try_from).transpose() + } + + async fn find_by_owner(&self, owner_id: UserId) -> DomainResult> { + let sql = format!( + "SELECT {SELECT_COLS} FROM channels WHERE owner_id = ? ORDER BY created_at ASC" + ); + let rows: Vec = sqlx::query_as(&sql) + .bind(owner_id.to_string()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + rows.into_iter().map(Channel::try_from).collect() + } + + async fn find_all(&self) -> DomainResult> { + let sql = format!("SELECT {SELECT_COLS} FROM channels ORDER BY created_at ASC"); + let rows: Vec = sqlx::query_as(&sql) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + rows.into_iter().map(Channel::try_from).collect() + } + + async fn save(&self, channel: &Channel) -> DomainResult<()> { + let schedule_config = serde_json::to_string(&channel.schedule_config).map_err(|e| { + DomainError::RepositoryError(format!("Failed to serialize schedule_config: {}", e)) + })?; + let recycle_policy = serde_json::to_string(&channel.recycle_policy).map_err(|e| { + DomainError::RepositoryError(format!("Failed to serialize recycle_policy: {}", e)) + })?; + + sqlx::query( + r#" + INSERT INTO channels + (id, owner_id, name, description, timezone, schedule_config, recycle_policy, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + name = excluded.name, + description = excluded.description, + timezone = excluded.timezone, + schedule_config = excluded.schedule_config, + recycle_policy = excluded.recycle_policy, + updated_at = excluded.updated_at + "#, + ) + .bind(channel.id.to_string()) + .bind(channel.owner_id.to_string()) + .bind(&channel.name) + .bind(&channel.description) + .bind(&channel.timezone) + .bind(&schedule_config) + .bind(&recycle_policy) + .bind(channel.created_at.to_rfc3339()) + .bind(channel.updated_at.to_rfc3339()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + Ok(()) + } + + async fn delete(&self, id: ChannelId) -> DomainResult<()> { + sqlx::query("DELETE FROM channels WHERE id = ?") + .bind(id.to_string()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + Ok(()) + } +} diff --git a/k-tv-backend/infra/src/jellyfin/config.rs b/k-tv-backend/infra/src/jellyfin/config.rs new file mode 100644 index 0000000..986fb54 --- /dev/null +++ b/k-tv-backend/infra/src/jellyfin/config.rs @@ -0,0 +1,10 @@ +/// Connection details for a single Jellyfin instance. +#[derive(Debug, Clone)] +pub struct JellyfinConfig { + /// e.g. `"http://192.168.1.10:8096"` — no trailing slash + pub base_url: String, + /// Jellyfin API key (Settings → API Keys) + pub api_key: String, + /// The Jellyfin user ID used for library browsing + pub user_id: String, +} diff --git a/k-tv-backend/infra/src/jellyfin/mapping.rs b/k-tv-backend/infra/src/jellyfin/mapping.rs new file mode 100644 index 0000000..18c353f --- /dev/null +++ b/k-tv-backend/infra/src/jellyfin/mapping.rs @@ -0,0 +1,35 @@ +use domain::{ContentType, MediaItem, MediaItemId}; + +use super::models::JellyfinItem; + +/// Ticks are Jellyfin's time unit: 1 tick = 100 nanoseconds → 10,000,000 ticks/sec. +pub(super) const TICKS_PER_SEC: i64 = 10_000_000; + +/// Map a raw Jellyfin item to a domain `MediaItem`. Returns `None` for unknown +/// item types (e.g. Season, Series, Folder) so they are silently skipped. +pub(super) fn map_jellyfin_item(item: JellyfinItem) -> Option { + let content_type = match item.item_type.as_str() { + "Movie" => ContentType::Movie, + "Episode" => ContentType::Episode, + _ => return None, + }; + + let duration_secs = item + .run_time_ticks + .map(|t| (t / TICKS_PER_SEC) as u32) + .unwrap_or(0); + + Some(MediaItem { + id: MediaItemId::new(item.id), + title: item.name, + content_type, + duration_secs, + description: item.overview, + genres: item.genres.unwrap_or_default(), + year: item.production_year, + tags: item.tags.unwrap_or_default(), + series_name: item.series_name, + season_number: item.parent_index_number, + episode_number: item.index_number, + }) +} diff --git a/k-tv-backend/infra/src/jellyfin/mod.rs b/k-tv-backend/infra/src/jellyfin/mod.rs new file mode 100644 index 0000000..3eb1194 --- /dev/null +++ b/k-tv-backend/infra/src/jellyfin/mod.rs @@ -0,0 +1,15 @@ +//! Jellyfin media provider adapter +//! +//! Implements [`IMediaProvider`] by talking to the Jellyfin HTTP API. +//! The domain never sees Jellyfin-specific types — this module translates +//! between Jellyfin's API model and the domain's abstract `MediaItem`/`MediaFilter`. + +#![cfg(feature = "jellyfin")] + +mod config; +mod mapping; +mod models; +mod provider; + +pub use config::JellyfinConfig; +pub use provider::JellyfinMediaProvider; diff --git a/k-tv-backend/infra/src/jellyfin/models.rs b/k-tv-backend/infra/src/jellyfin/models.rs new file mode 100644 index 0000000..c11c304 --- /dev/null +++ b/k-tv-backend/infra/src/jellyfin/models.rs @@ -0,0 +1,57 @@ +use serde::Deserialize; + +use domain::ContentType; + +// ============================================================================ +// Jellyfin API response types +// ============================================================================ + +#[derive(Debug, Deserialize)] +pub(super) struct JellyfinItemsResponse { + #[serde(rename = "Items")] + pub items: Vec, +} + +#[derive(Debug, Deserialize)] +pub(super) struct JellyfinItem { + #[serde(rename = "Id")] + pub id: String, + #[serde(rename = "Name")] + pub name: String, + #[serde(rename = "Type")] + pub item_type: String, + #[serde(rename = "RunTimeTicks")] + pub run_time_ticks: Option, + #[serde(rename = "Overview")] + pub overview: Option, + #[serde(rename = "Genres")] + pub genres: Option>, + #[serde(rename = "ProductionYear")] + pub production_year: Option, + #[serde(rename = "Tags")] + pub tags: Option>, + /// TV show name (episodes only) + #[serde(rename = "SeriesName")] + pub series_name: Option, + /// Season number (episodes only) + #[serde(rename = "ParentIndexNumber")] + pub parent_index_number: Option, + /// Episode number within the season (episodes only) + #[serde(rename = "IndexNumber")] + pub index_number: Option, + /// Collection type for virtual library folders (e.g. "movies", "tvshows") + #[serde(rename = "CollectionType")] + pub collection_type: Option, + /// Total number of child items (used for Series to count episodes) + #[serde(rename = "RecursiveItemCount")] + pub recursive_item_count: Option, +} + +pub(super) fn jellyfin_item_type(ct: &ContentType) -> &'static str { + match ct { + ContentType::Movie => "Movie", + ContentType::Episode => "Episode", + // Jellyfin has no native "Short" type; short films are filed as Movies + ContentType::Short => "Movie", + } +} diff --git a/k-tv-backend/infra/src/jellyfin.rs b/k-tv-backend/infra/src/jellyfin/provider.rs similarity index 75% rename from k-tv-backend/infra/src/jellyfin.rs rename to k-tv-backend/infra/src/jellyfin/provider.rs index 07bfd6d..71dbfd8 100644 --- a/k-tv-backend/infra/src/jellyfin.rs +++ b/k-tv-backend/infra/src/jellyfin/provider.rs @@ -1,41 +1,17 @@ -//! Jellyfin media provider adapter -//! -//! Implements [`IMediaProvider`] by talking to the Jellyfin HTTP API. -//! The domain never sees Jellyfin-specific types — this module translates -//! between Jellyfin's API model and the domain's abstract `MediaItem`/`MediaFilter`. - -#![cfg(feature = "jellyfin")] - use async_trait::async_trait; -use serde::Deserialize; -use domain::{Collection, ContentType, DomainError, DomainResult, IMediaProvider, MediaFilter, MediaItem, MediaItemId, SeriesSummary}; +use domain::{ + Collection, ContentType, DomainError, DomainResult, IMediaProvider, MediaFilter, MediaItem, + MediaItemId, SeriesSummary, +}; -/// Ticks are Jellyfin's time unit: 1 tick = 100 nanoseconds → 10,000,000 ticks/sec. -const TICKS_PER_SEC: i64 = 10_000_000; - -// ============================================================================ -// Configuration -// ============================================================================ - -/// Connection details for a single Jellyfin instance. -#[derive(Debug, Clone)] -pub struct JellyfinConfig { - /// e.g. `"http://192.168.1.10:8096"` — no trailing slash - pub base_url: String, - /// Jellyfin API key (Settings → API Keys) - pub api_key: String, - /// The Jellyfin user ID used for library browsing - pub user_id: String, -} - -// ============================================================================ -// Adapter -// ============================================================================ +use super::config::JellyfinConfig; +use super::mapping::{map_jellyfin_item, TICKS_PER_SEC}; +use super::models::{jellyfin_item_type, JellyfinItemsResponse}; pub struct JellyfinMediaProvider { - client: reqwest::Client, - config: JellyfinConfig, + pub(super) client: reqwest::Client, + pub(super) config: JellyfinConfig, } impl JellyfinMediaProvider { @@ -48,9 +24,7 @@ impl JellyfinMediaProvider { }, } } -} -impl JellyfinMediaProvider { /// Inner fetch: applies all filter fields plus an optional series name override. async fn fetch_items_for_series( &self, @@ -151,7 +125,6 @@ impl JellyfinMediaProvider { Ok(items) } - } #[async_trait] @@ -396,90 +369,3 @@ impl IMediaProvider for JellyfinMediaProvider { )) } } - -// ============================================================================ -// Jellyfin API response types -// ============================================================================ - -#[derive(Debug, Deserialize)] -struct JellyfinItemsResponse { - #[serde(rename = "Items")] - items: Vec, -} - -#[derive(Debug, Deserialize)] -struct JellyfinItem { - #[serde(rename = "Id")] - id: String, - #[serde(rename = "Name")] - name: String, - #[serde(rename = "Type")] - item_type: String, - #[serde(rename = "RunTimeTicks")] - run_time_ticks: Option, - #[serde(rename = "Overview")] - overview: Option, - #[serde(rename = "Genres")] - genres: Option>, - #[serde(rename = "ProductionYear")] - production_year: Option, - #[serde(rename = "Tags")] - tags: Option>, - /// TV show name (episodes only) - #[serde(rename = "SeriesName")] - series_name: Option, - /// Season number (episodes only) - #[serde(rename = "ParentIndexNumber")] - parent_index_number: Option, - /// Episode number within the season (episodes only) - #[serde(rename = "IndexNumber")] - index_number: Option, - /// Collection type for virtual library folders (e.g. "movies", "tvshows") - #[serde(rename = "CollectionType")] - collection_type: Option, - /// Total number of child items (used for Series to count episodes) - #[serde(rename = "RecursiveItemCount")] - recursive_item_count: Option, -} - -// ============================================================================ -// Mapping helpers -// ============================================================================ - -fn jellyfin_item_type(ct: &ContentType) -> &'static str { - match ct { - ContentType::Movie => "Movie", - ContentType::Episode => "Episode", - // Jellyfin has no native "Short" type; short films are filed as Movies - ContentType::Short => "Movie", - } -} - -/// Map a raw Jellyfin item to a domain `MediaItem`. Returns `None` for unknown -/// item types (e.g. Season, Series, Folder) so they are silently skipped. -fn map_jellyfin_item(item: JellyfinItem) -> Option { - let content_type = match item.item_type.as_str() { - "Movie" => ContentType::Movie, - "Episode" => ContentType::Episode, - _ => return None, - }; - - let duration_secs = item - .run_time_ticks - .map(|t| (t / TICKS_PER_SEC) as u32) - .unwrap_or(0); - - Some(MediaItem { - id: MediaItemId::new(item.id), - title: item.name, - content_type, - duration_secs, - description: item.overview, - genres: item.genres.unwrap_or_default(), - year: item.production_year, - tags: item.tags.unwrap_or_default(), - series_name: item.series_name, - season_number: item.parent_index_number, - episode_number: item.index_number, - }) -} diff --git a/k-tv-backend/infra/src/schedule_repository.rs b/k-tv-backend/infra/src/schedule_repository.rs deleted file mode 100644 index aeeeedb..0000000 --- a/k-tv-backend/infra/src/schedule_repository.rs +++ /dev/null @@ -1,447 +0,0 @@ -//! SQLite and PostgreSQL adapters for ScheduleRepository - -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use sqlx::FromRow; -use uuid::Uuid; - -use domain::{ - ChannelId, DomainError, DomainResult, GeneratedSchedule, MediaItem, MediaItemId, - PlaybackRecord, ScheduleRepository, ScheduledSlot, -}; - -// ============================================================================ -// Row types -// ============================================================================ - -#[derive(Debug, FromRow)] -struct ScheduleRow { - id: String, - channel_id: String, - valid_from: String, - valid_until: String, - generation: i64, -} - -#[derive(Debug, FromRow)] -struct SlotRow { - id: String, - // schedule_id selected but only used to drive the JOIN; not needed for domain type - #[allow(dead_code)] - schedule_id: String, - start_at: String, - end_at: String, - item: String, - source_block_id: String, -} - -#[derive(Debug, FromRow)] -struct PlaybackRecordRow { - id: String, - channel_id: String, - item_id: String, - played_at: String, - generation: i64, -} - -// ============================================================================ -// Mapping -// ============================================================================ - -fn parse_dt(s: &str) -> Result, DomainError> { - DateTime::parse_from_rfc3339(s) - .map(|dt| dt.with_timezone(&Utc)) - .or_else(|_| { - chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map(|dt| dt.and_utc()) - }) - .map_err(|e| DomainError::RepositoryError(format!("Invalid datetime '{}': {}", s, e))) -} - -fn map_slot_row(row: SlotRow) -> Result { - let id = Uuid::parse_str(&row.id) - .map_err(|e| DomainError::RepositoryError(format!("Invalid slot UUID: {}", e)))?; - let source_block_id = Uuid::parse_str(&row.source_block_id) - .map_err(|e| DomainError::RepositoryError(format!("Invalid block UUID: {}", e)))?; - let item: MediaItem = serde_json::from_str(&row.item) - .map_err(|e| DomainError::RepositoryError(format!("Invalid slot item JSON: {}", e)))?; - - Ok(ScheduledSlot { - id, - start_at: parse_dt(&row.start_at)?, - end_at: parse_dt(&row.end_at)?, - item, - source_block_id, - }) -} - -fn map_schedule(row: ScheduleRow, slot_rows: Vec) -> Result { - let id = Uuid::parse_str(&row.id) - .map_err(|e| DomainError::RepositoryError(format!("Invalid schedule UUID: {}", e)))?; - let channel_id = Uuid::parse_str(&row.channel_id) - .map_err(|e| DomainError::RepositoryError(format!("Invalid channel UUID: {}", e)))?; - - let slots: Result, _> = slot_rows.into_iter().map(map_slot_row).collect(); - - Ok(GeneratedSchedule { - id, - channel_id, - valid_from: parse_dt(&row.valid_from)?, - valid_until: parse_dt(&row.valid_until)?, - generation: row.generation as u32, - slots: slots?, - }) -} - -impl TryFrom for PlaybackRecord { - type Error = DomainError; - - fn try_from(row: PlaybackRecordRow) -> Result { - let id = Uuid::parse_str(&row.id) - .map_err(|e| DomainError::RepositoryError(format!("Invalid UUID: {}", e)))?; - let channel_id = Uuid::parse_str(&row.channel_id) - .map_err(|e| DomainError::RepositoryError(format!("Invalid channel UUID: {}", e)))?; - - Ok(PlaybackRecord { - id, - channel_id, - item_id: MediaItemId::new(row.item_id), - played_at: parse_dt(&row.played_at)?, - generation: row.generation as u32, - }) - } -} - -// ============================================================================ -// SQLite adapter -// ============================================================================ - -#[cfg(feature = "sqlite")] -pub struct SqliteScheduleRepository { - pool: sqlx::SqlitePool, -} - -#[cfg(feature = "sqlite")] -impl SqliteScheduleRepository { - pub fn new(pool: sqlx::SqlitePool) -> Self { - Self { pool } - } - - async fn fetch_slots(&self, schedule_id: &str) -> DomainResult> { - sqlx::query_as( - "SELECT id, schedule_id, start_at, end_at, item, source_block_id \ - FROM scheduled_slots WHERE schedule_id = ? ORDER BY start_at", - ) - .bind(schedule_id) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string())) - } -} - -#[cfg(feature = "sqlite")] -#[async_trait] -impl ScheduleRepository for SqliteScheduleRepository { - async fn find_active( - &self, - channel_id: ChannelId, - at: DateTime, - ) -> DomainResult> { - let at_str = at.to_rfc3339(); - let row: Option = sqlx::query_as( - "SELECT id, channel_id, valid_from, valid_until, generation \ - FROM generated_schedules \ - WHERE channel_id = ? AND valid_from <= ? AND valid_until > ? \ - LIMIT 1", - ) - .bind(channel_id.to_string()) - .bind(&at_str) - .bind(&at_str) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - match row { - None => Ok(None), - Some(r) => { - let slots = self.fetch_slots(&r.id).await?; - Some(map_schedule(r, slots)).transpose() - } - } - } - - async fn find_latest(&self, channel_id: ChannelId) -> DomainResult> { - let row: Option = sqlx::query_as( - "SELECT id, channel_id, valid_from, valid_until, generation \ - FROM generated_schedules \ - WHERE channel_id = ? ORDER BY valid_from DESC LIMIT 1", - ) - .bind(channel_id.to_string()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - match row { - None => Ok(None), - Some(r) => { - let slots = self.fetch_slots(&r.id).await?; - Some(map_schedule(r, slots)).transpose() - } - } - } - - async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> { - // Upsert the schedule header - sqlx::query( - r#" - INSERT INTO generated_schedules (id, channel_id, valid_from, valid_until, generation) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT(id) DO UPDATE SET - valid_from = excluded.valid_from, - valid_until = excluded.valid_until, - generation = excluded.generation - "#, - ) - .bind(schedule.id.to_string()) - .bind(schedule.channel_id.to_string()) - .bind(schedule.valid_from.to_rfc3339()) - .bind(schedule.valid_until.to_rfc3339()) - .bind(schedule.generation as i64) - .execute(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - // Replace all slots (delete-then-insert is safe here; schedule saves are - // infrequent and atomic within a single-writer SQLite connection) - sqlx::query("DELETE FROM scheduled_slots WHERE schedule_id = ?") - .bind(schedule.id.to_string()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - for slot in &schedule.slots { - let item_json = serde_json::to_string(&slot.item).map_err(|e| { - DomainError::RepositoryError(format!("Failed to serialize slot item: {}", e)) - })?; - - sqlx::query( - r#" - INSERT INTO scheduled_slots (id, schedule_id, start_at, end_at, item, source_block_id) - VALUES (?, ?, ?, ?, ?, ?) - "#, - ) - .bind(slot.id.to_string()) - .bind(schedule.id.to_string()) - .bind(slot.start_at.to_rfc3339()) - .bind(slot.end_at.to_rfc3339()) - .bind(&item_json) - .bind(slot.source_block_id.to_string()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - } - - Ok(()) - } - - async fn find_playback_history( - &self, - channel_id: ChannelId, - ) -> DomainResult> { - let rows: Vec = sqlx::query_as( - "SELECT id, channel_id, item_id, played_at, generation \ - FROM playback_records WHERE channel_id = ? ORDER BY played_at DESC", - ) - .bind(channel_id.to_string()) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - rows.into_iter().map(PlaybackRecord::try_from).collect() - } - - async fn save_playback_record(&self, record: &PlaybackRecord) -> DomainResult<()> { - sqlx::query( - r#" - INSERT INTO playback_records (id, channel_id, item_id, played_at, generation) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT(id) DO NOTHING - "#, - ) - .bind(record.id.to_string()) - .bind(record.channel_id.to_string()) - .bind(record.item_id.as_ref()) - .bind(record.played_at.to_rfc3339()) - .bind(record.generation as i64) - .execute(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - Ok(()) - } -} - -// ============================================================================ -// PostgreSQL adapter -// ============================================================================ - -#[cfg(feature = "postgres")] -pub struct PostgresScheduleRepository { - pool: sqlx::Pool, -} - -#[cfg(feature = "postgres")] -impl PostgresScheduleRepository { - pub fn new(pool: sqlx::Pool) -> Self { - Self { pool } - } - - async fn fetch_slots(&self, schedule_id: &str) -> DomainResult> { - sqlx::query_as( - "SELECT id, schedule_id, start_at, end_at, item, source_block_id \ - FROM scheduled_slots WHERE schedule_id = $1 ORDER BY start_at", - ) - .bind(schedule_id) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string())) - } -} - -#[cfg(feature = "postgres")] -#[async_trait] -impl ScheduleRepository for PostgresScheduleRepository { - async fn find_active( - &self, - channel_id: ChannelId, - at: DateTime, - ) -> DomainResult> { - let at_str = at.to_rfc3339(); - let row: Option = sqlx::query_as( - "SELECT id, channel_id, valid_from, valid_until, generation \ - FROM generated_schedules \ - WHERE channel_id = $1 AND valid_from <= $2 AND valid_until > $3 \ - LIMIT 1", - ) - .bind(channel_id.to_string()) - .bind(&at_str) - .bind(&at_str) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - match row { - None => Ok(None), - Some(r) => { - let slots = self.fetch_slots(&r.id).await?; - Some(map_schedule(r, slots)).transpose() - } - } - } - - async fn find_latest(&self, channel_id: ChannelId) -> DomainResult> { - let row: Option = sqlx::query_as( - "SELECT id, channel_id, valid_from, valid_until, generation \ - FROM generated_schedules \ - WHERE channel_id = $1 ORDER BY valid_from DESC LIMIT 1", - ) - .bind(channel_id.to_string()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - match row { - None => Ok(None), - Some(r) => { - let slots = self.fetch_slots(&r.id).await?; - Some(map_schedule(r, slots)).transpose() - } - } - } - - async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> { - sqlx::query( - r#" - INSERT INTO generated_schedules (id, channel_id, valid_from, valid_until, generation) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT(id) DO UPDATE SET - valid_from = EXCLUDED.valid_from, - valid_until = EXCLUDED.valid_until, - generation = EXCLUDED.generation - "#, - ) - .bind(schedule.id.to_string()) - .bind(schedule.channel_id.to_string()) - .bind(schedule.valid_from.to_rfc3339()) - .bind(schedule.valid_until.to_rfc3339()) - .bind(schedule.generation as i64) - .execute(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - sqlx::query("DELETE FROM scheduled_slots WHERE schedule_id = $1") - .bind(schedule.id.to_string()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - for slot in &schedule.slots { - let item_json = serde_json::to_string(&slot.item).map_err(|e| { - DomainError::RepositoryError(format!("Failed to serialize slot item: {}", e)) - })?; - - sqlx::query( - r#" - INSERT INTO scheduled_slots (id, schedule_id, start_at, end_at, item, source_block_id) - VALUES ($1, $2, $3, $4, $5, $6) - "#, - ) - .bind(slot.id.to_string()) - .bind(schedule.id.to_string()) - .bind(slot.start_at.to_rfc3339()) - .bind(slot.end_at.to_rfc3339()) - .bind(&item_json) - .bind(slot.source_block_id.to_string()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - } - - Ok(()) - } - - async fn find_playback_history( - &self, - channel_id: ChannelId, - ) -> DomainResult> { - let rows: Vec = sqlx::query_as( - "SELECT id, channel_id, item_id, played_at, generation \ - FROM playback_records WHERE channel_id = $1 ORDER BY played_at DESC", - ) - .bind(channel_id.to_string()) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - rows.into_iter().map(PlaybackRecord::try_from).collect() - } - - async fn save_playback_record(&self, record: &PlaybackRecord) -> DomainResult<()> { - sqlx::query( - r#" - INSERT INTO playback_records (id, channel_id, item_id, played_at, generation) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT(id) DO NOTHING - "#, - ) - .bind(record.id.to_string()) - .bind(record.channel_id.to_string()) - .bind(record.item_id.as_ref()) - .bind(record.played_at.to_rfc3339()) - .bind(record.generation as i64) - .execute(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - Ok(()) - } -} diff --git a/k-tv-backend/infra/src/schedule_repository/mapping.rs b/k-tv-backend/infra/src/schedule_repository/mapping.rs new file mode 100644 index 0000000..25c0c1f --- /dev/null +++ b/k-tv-backend/infra/src/schedule_repository/mapping.rs @@ -0,0 +1,109 @@ +use chrono::{DateTime, Utc}; +use sqlx::FromRow; +use uuid::Uuid; + +use domain::{DomainError, GeneratedSchedule, MediaItem, MediaItemId, PlaybackRecord, ScheduledSlot}; + +// ============================================================================ +// Row types +// ============================================================================ + +#[derive(Debug, FromRow)] +pub(super) struct ScheduleRow { + pub id: String, + pub channel_id: String, + pub valid_from: String, + pub valid_until: String, + pub generation: i64, +} + +#[derive(Debug, FromRow)] +pub(super) struct SlotRow { + pub id: String, + // schedule_id selected but only used to drive the JOIN; not needed for domain type + #[allow(dead_code)] + pub schedule_id: String, + pub start_at: String, + pub end_at: String, + pub item: String, + pub source_block_id: String, +} + +#[derive(Debug, FromRow)] +pub(super) struct PlaybackRecordRow { + pub id: String, + pub channel_id: String, + pub item_id: String, + pub played_at: String, + pub generation: i64, +} + +// ============================================================================ +// Mapping +// ============================================================================ + +pub(super) fn parse_dt(s: &str) -> Result, DomainError> { + DateTime::parse_from_rfc3339(s) + .map(|dt| dt.with_timezone(&Utc)) + .or_else(|_| { + chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map(|dt| dt.and_utc()) + }) + .map_err(|e| DomainError::RepositoryError(format!("Invalid datetime '{}': {}", s, e))) +} + +pub(super) fn map_slot_row(row: SlotRow) -> Result { + let id = Uuid::parse_str(&row.id) + .map_err(|e| DomainError::RepositoryError(format!("Invalid slot UUID: {}", e)))?; + let source_block_id = Uuid::parse_str(&row.source_block_id) + .map_err(|e| DomainError::RepositoryError(format!("Invalid block UUID: {}", e)))?; + let item: MediaItem = serde_json::from_str(&row.item) + .map_err(|e| DomainError::RepositoryError(format!("Invalid slot item JSON: {}", e)))?; + + Ok(ScheduledSlot { + id, + start_at: parse_dt(&row.start_at)?, + end_at: parse_dt(&row.end_at)?, + item, + source_block_id, + }) +} + +pub(super) fn map_schedule( + row: ScheduleRow, + slot_rows: Vec, +) -> Result { + let id = Uuid::parse_str(&row.id) + .map_err(|e| DomainError::RepositoryError(format!("Invalid schedule UUID: {}", e)))?; + let channel_id = Uuid::parse_str(&row.channel_id) + .map_err(|e| DomainError::RepositoryError(format!("Invalid channel UUID: {}", e)))?; + + let slots: Result, _> = slot_rows.into_iter().map(map_slot_row).collect(); + + Ok(GeneratedSchedule { + id, + channel_id, + valid_from: parse_dt(&row.valid_from)?, + valid_until: parse_dt(&row.valid_until)?, + generation: row.generation as u32, + slots: slots?, + }) +} + +impl TryFrom for PlaybackRecord { + type Error = DomainError; + + fn try_from(row: PlaybackRecordRow) -> Result { + let id = Uuid::parse_str(&row.id) + .map_err(|e| DomainError::RepositoryError(format!("Invalid UUID: {}", e)))?; + let channel_id = Uuid::parse_str(&row.channel_id) + .map_err(|e| DomainError::RepositoryError(format!("Invalid channel UUID: {}", e)))?; + + Ok(PlaybackRecord { + id, + channel_id, + item_id: MediaItemId::new(row.item_id), + played_at: parse_dt(&row.played_at)?, + generation: row.generation as u32, + }) + } +} diff --git a/k-tv-backend/infra/src/schedule_repository/mod.rs b/k-tv-backend/infra/src/schedule_repository/mod.rs new file mode 100644 index 0000000..53db0cb --- /dev/null +++ b/k-tv-backend/infra/src/schedule_repository/mod.rs @@ -0,0 +1,13 @@ +//! SQLite and PostgreSQL adapters for ScheduleRepository + +mod mapping; + +#[cfg(feature = "sqlite")] +mod sqlite; +#[cfg(feature = "postgres")] +mod postgres; + +#[cfg(feature = "sqlite")] +pub use sqlite::SqliteScheduleRepository; +#[cfg(feature = "postgres")] +pub use postgres::PostgresScheduleRepository; diff --git a/k-tv-backend/infra/src/schedule_repository/postgres.rs b/k-tv-backend/infra/src/schedule_repository/postgres.rs new file mode 100644 index 0000000..0cc93b2 --- /dev/null +++ b/k-tv-backend/infra/src/schedule_repository/postgres.rs @@ -0,0 +1,165 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; + +use domain::{ChannelId, DomainError, DomainResult, GeneratedSchedule, PlaybackRecord, ScheduleRepository}; + +use super::mapping::{map_schedule, PlaybackRecordRow, ScheduleRow, SlotRow}; + +pub struct PostgresScheduleRepository { + pool: sqlx::Pool, +} + +impl PostgresScheduleRepository { + pub fn new(pool: sqlx::Pool) -> Self { + Self { pool } + } + + async fn fetch_slots(&self, schedule_id: &str) -> DomainResult> { + sqlx::query_as( + "SELECT id, schedule_id, start_at, end_at, item, source_block_id \ + FROM scheduled_slots WHERE schedule_id = $1 ORDER BY start_at", + ) + .bind(schedule_id) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string())) + } +} + +#[async_trait] +impl ScheduleRepository for PostgresScheduleRepository { + async fn find_active( + &self, + channel_id: ChannelId, + at: DateTime, + ) -> DomainResult> { + let at_str = at.to_rfc3339(); + let row: Option = sqlx::query_as( + "SELECT id, channel_id, valid_from, valid_until, generation \ + FROM generated_schedules \ + WHERE channel_id = $1 AND valid_from <= $2 AND valid_until > $3 \ + LIMIT 1", + ) + .bind(channel_id.to_string()) + .bind(&at_str) + .bind(&at_str) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + match row { + None => Ok(None), + Some(r) => { + let slots = self.fetch_slots(&r.id).await?; + Some(map_schedule(r, slots)).transpose() + } + } + } + + async fn find_latest(&self, channel_id: ChannelId) -> DomainResult> { + let row: Option = sqlx::query_as( + "SELECT id, channel_id, valid_from, valid_until, generation \ + FROM generated_schedules \ + WHERE channel_id = $1 ORDER BY valid_from DESC LIMIT 1", + ) + .bind(channel_id.to_string()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + match row { + None => Ok(None), + Some(r) => { + let slots = self.fetch_slots(&r.id).await?; + Some(map_schedule(r, slots)).transpose() + } + } + } + + async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> { + sqlx::query( + r#" + INSERT INTO generated_schedules (id, channel_id, valid_from, valid_until, generation) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT(id) DO UPDATE SET + valid_from = EXCLUDED.valid_from, + valid_until = EXCLUDED.valid_until, + generation = EXCLUDED.generation + "#, + ) + .bind(schedule.id.to_string()) + .bind(schedule.channel_id.to_string()) + .bind(schedule.valid_from.to_rfc3339()) + .bind(schedule.valid_until.to_rfc3339()) + .bind(schedule.generation as i64) + .execute(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + sqlx::query("DELETE FROM scheduled_slots WHERE schedule_id = $1") + .bind(schedule.id.to_string()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + for slot in &schedule.slots { + let item_json = serde_json::to_string(&slot.item).map_err(|e| { + DomainError::RepositoryError(format!("Failed to serialize slot item: {}", e)) + })?; + + sqlx::query( + r#" + INSERT INTO scheduled_slots (id, schedule_id, start_at, end_at, item, source_block_id) + VALUES ($1, $2, $3, $4, $5, $6) + "#, + ) + .bind(slot.id.to_string()) + .bind(schedule.id.to_string()) + .bind(slot.start_at.to_rfc3339()) + .bind(slot.end_at.to_rfc3339()) + .bind(&item_json) + .bind(slot.source_block_id.to_string()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + } + + Ok(()) + } + + async fn find_playback_history( + &self, + channel_id: ChannelId, + ) -> DomainResult> { + let rows: Vec = sqlx::query_as( + "SELECT id, channel_id, item_id, played_at, generation \ + FROM playback_records WHERE channel_id = $1 ORDER BY played_at DESC", + ) + .bind(channel_id.to_string()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + rows.into_iter().map(PlaybackRecord::try_from).collect() + } + + async fn save_playback_record(&self, record: &PlaybackRecord) -> DomainResult<()> { + sqlx::query( + r#" + INSERT INTO playback_records (id, channel_id, item_id, played_at, generation) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT(id) DO NOTHING + "#, + ) + .bind(record.id.to_string()) + .bind(record.channel_id.to_string()) + .bind(record.item_id.as_ref()) + .bind(record.played_at.to_rfc3339()) + .bind(record.generation as i64) + .execute(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + Ok(()) + } +} diff --git a/k-tv-backend/infra/src/schedule_repository/sqlite.rs b/k-tv-backend/infra/src/schedule_repository/sqlite.rs new file mode 100644 index 0000000..9fec40d --- /dev/null +++ b/k-tv-backend/infra/src/schedule_repository/sqlite.rs @@ -0,0 +1,168 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; + +use domain::{ChannelId, DomainError, DomainResult, GeneratedSchedule, PlaybackRecord, ScheduleRepository}; + +use super::mapping::{map_schedule, PlaybackRecordRow, ScheduleRow, SlotRow}; + +pub struct SqliteScheduleRepository { + pool: sqlx::SqlitePool, +} + +impl SqliteScheduleRepository { + pub fn new(pool: sqlx::SqlitePool) -> Self { + Self { pool } + } + + async fn fetch_slots(&self, schedule_id: &str) -> DomainResult> { + sqlx::query_as( + "SELECT id, schedule_id, start_at, end_at, item, source_block_id \ + FROM scheduled_slots WHERE schedule_id = ? ORDER BY start_at", + ) + .bind(schedule_id) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string())) + } +} + +#[async_trait] +impl ScheduleRepository for SqliteScheduleRepository { + async fn find_active( + &self, + channel_id: ChannelId, + at: DateTime, + ) -> DomainResult> { + let at_str = at.to_rfc3339(); + let row: Option = sqlx::query_as( + "SELECT id, channel_id, valid_from, valid_until, generation \ + FROM generated_schedules \ + WHERE channel_id = ? AND valid_from <= ? AND valid_until > ? \ + LIMIT 1", + ) + .bind(channel_id.to_string()) + .bind(&at_str) + .bind(&at_str) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + match row { + None => Ok(None), + Some(r) => { + let slots = self.fetch_slots(&r.id).await?; + Some(map_schedule(r, slots)).transpose() + } + } + } + + async fn find_latest(&self, channel_id: ChannelId) -> DomainResult> { + let row: Option = sqlx::query_as( + "SELECT id, channel_id, valid_from, valid_until, generation \ + FROM generated_schedules \ + WHERE channel_id = ? ORDER BY valid_from DESC LIMIT 1", + ) + .bind(channel_id.to_string()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + match row { + None => Ok(None), + Some(r) => { + let slots = self.fetch_slots(&r.id).await?; + Some(map_schedule(r, slots)).transpose() + } + } + } + + async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> { + // Upsert the schedule header + sqlx::query( + r#" + INSERT INTO generated_schedules (id, channel_id, valid_from, valid_until, generation) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + valid_from = excluded.valid_from, + valid_until = excluded.valid_until, + generation = excluded.generation + "#, + ) + .bind(schedule.id.to_string()) + .bind(schedule.channel_id.to_string()) + .bind(schedule.valid_from.to_rfc3339()) + .bind(schedule.valid_until.to_rfc3339()) + .bind(schedule.generation as i64) + .execute(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + // Replace all slots (delete-then-insert is safe here; schedule saves are + // infrequent and atomic within a single-writer SQLite connection) + sqlx::query("DELETE FROM scheduled_slots WHERE schedule_id = ?") + .bind(schedule.id.to_string()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + for slot in &schedule.slots { + let item_json = serde_json::to_string(&slot.item).map_err(|e| { + DomainError::RepositoryError(format!("Failed to serialize slot item: {}", e)) + })?; + + sqlx::query( + r#" + INSERT INTO scheduled_slots (id, schedule_id, start_at, end_at, item, source_block_id) + VALUES (?, ?, ?, ?, ?, ?) + "#, + ) + .bind(slot.id.to_string()) + .bind(schedule.id.to_string()) + .bind(slot.start_at.to_rfc3339()) + .bind(slot.end_at.to_rfc3339()) + .bind(&item_json) + .bind(slot.source_block_id.to_string()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + } + + Ok(()) + } + + async fn find_playback_history( + &self, + channel_id: ChannelId, + ) -> DomainResult> { + let rows: Vec = sqlx::query_as( + "SELECT id, channel_id, item_id, played_at, generation \ + FROM playback_records WHERE channel_id = ? ORDER BY played_at DESC", + ) + .bind(channel_id.to_string()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + rows.into_iter().map(PlaybackRecord::try_from).collect() + } + + async fn save_playback_record(&self, record: &PlaybackRecord) -> DomainResult<()> { + sqlx::query( + r#" + INSERT INTO playback_records (id, channel_id, item_id, played_at, generation) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(id) DO NOTHING + "#, + ) + .bind(record.id.to_string()) + .bind(record.channel_id.to_string()) + .bind(record.item_id.as_ref()) + .bind(record.played_at.to_rfc3339()) + .bind(record.generation as i64) + .execute(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + Ok(()) + } +} diff --git a/k-tv-backend/infra/src/user_repository/mapping.rs b/k-tv-backend/infra/src/user_repository/mapping.rs new file mode 100644 index 0000000..581185f --- /dev/null +++ b/k-tv-backend/infra/src/user_repository/mapping.rs @@ -0,0 +1,42 @@ +use chrono::{DateTime, Utc}; +use sqlx::FromRow; +use uuid::Uuid; + +use domain::{DomainError, Email, User}; + +#[derive(Debug, FromRow)] +pub(super) struct UserRow { + pub id: String, + pub subject: String, + pub email: String, + pub password_hash: Option, + pub created_at: String, +} + +impl TryFrom for User { + type Error = DomainError; + + fn try_from(row: UserRow) -> Result { + let id = Uuid::parse_str(&row.id) + .map_err(|e| DomainError::RepositoryError(format!("Invalid UUID: {}", e)))?; + let created_at = DateTime::parse_from_rfc3339(&row.created_at) + .map(|dt| dt.with_timezone(&Utc)) + .or_else(|_| { + // Fallback for SQLite datetime format + chrono::NaiveDateTime::parse_from_str(&row.created_at, "%Y-%m-%d %H:%M:%S") + .map(|dt| dt.and_utc()) + }) + .map_err(|e| DomainError::RepositoryError(format!("Invalid datetime: {}", e)))?; + + let email = Email::try_from(row.email) + .map_err(|e| DomainError::RepositoryError(format!("Invalid email in DB: {}", e)))?; + + Ok(User::with_id( + id, + row.subject, + email, + row.password_hash, + created_at, + )) + } +} diff --git a/k-tv-backend/infra/src/user_repository/mod.rs b/k-tv-backend/infra/src/user_repository/mod.rs new file mode 100644 index 0000000..cb3204f --- /dev/null +++ b/k-tv-backend/infra/src/user_repository/mod.rs @@ -0,0 +1,13 @@ +//! SQLite and PostgreSQL implementations of UserRepository + +mod mapping; + +#[cfg(feature = "sqlite")] +mod sqlite; +#[cfg(feature = "postgres")] +mod postgres; + +#[cfg(feature = "sqlite")] +pub use sqlite::SqliteUserRepository; +#[cfg(feature = "postgres")] +pub use postgres::PostgresUserRepository; diff --git a/k-tv-backend/infra/src/user_repository/postgres.rs b/k-tv-backend/infra/src/user_repository/postgres.rs new file mode 100644 index 0000000..0a31d26 --- /dev/null +++ b/k-tv-backend/infra/src/user_repository/postgres.rs @@ -0,0 +1,102 @@ +use async_trait::async_trait; +use uuid::Uuid; + +use domain::{DomainError, DomainResult, User, UserRepository}; + +use super::mapping::UserRow; + +/// PostgreSQL adapter for UserRepository +#[derive(Clone)] +pub struct PostgresUserRepository { + pool: sqlx::Pool, +} + +impl PostgresUserRepository { + pub fn new(pool: sqlx::Pool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl UserRepository for PostgresUserRepository { + async fn find_by_id(&self, id: Uuid) -> DomainResult> { + let id_str = id.to_string(); + let row: Option = sqlx::query_as( + "SELECT id, subject, email, password_hash, created_at FROM users WHERE id = $1", + ) + .bind(&id_str) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + row.map(User::try_from).transpose() + } + + async fn find_by_subject(&self, subject: &str) -> DomainResult> { + let row: Option = sqlx::query_as( + "SELECT id, subject, email, password_hash, created_at FROM users WHERE subject = $1", + ) + .bind(subject) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + row.map(User::try_from).transpose() + } + + async fn find_by_email(&self, email: &str) -> DomainResult> { + let row: Option = sqlx::query_as( + "SELECT id, subject, email, password_hash, created_at FROM users WHERE email = $1", + ) + .bind(email) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + row.map(User::try_from).transpose() + } + + async fn save(&self, user: &User) -> DomainResult<()> { + let id = user.id.to_string(); + let created_at = user.created_at.to_rfc3339(); + + sqlx::query( + r#" + INSERT INTO users (id, subject, email, password_hash, created_at) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT(id) DO UPDATE SET + subject = excluded.subject, + email = excluded.email, + password_hash = excluded.password_hash + "#, + ) + .bind(&id) + .bind(&user.subject) + .bind(user.email.as_ref()) + .bind(&user.password_hash) + .bind(&created_at) + .execute(&self.pool) + .await + .map_err(|e| { + let msg = e.to_string(); + if msg.contains("unique constraint") || msg.contains("duplicate key") { + DomainError::UserAlreadyExists(user.email.as_ref().to_string()) + } else { + DomainError::RepositoryError(msg) + } + })?; + + Ok(()) + } + + async fn delete(&self, id: Uuid) -> DomainResult<()> { + let id_str = id.to_string(); + sqlx::query("DELETE FROM users WHERE id = $1") + .bind(&id_str) + .execute(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + Ok(()) + } +} diff --git a/k-tv-backend/infra/src/user_repository.rs b/k-tv-backend/infra/src/user_repository/sqlite.rs similarity index 55% rename from k-tv-backend/infra/src/user_repository.rs rename to k-tv-backend/infra/src/user_repository/sqlite.rs index 4038adb..0bf89cb 100644 --- a/k-tv-backend/infra/src/user_repository.rs +++ b/k-tv-backend/infra/src/user_repository/sqlite.rs @@ -1,65 +1,22 @@ -//! SQLite and PostgreSQL implementations of UserRepository - use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use sqlx::FromRow; use uuid::Uuid; -use domain::{DomainError, DomainResult, Email, User, UserRepository}; +use domain::{DomainError, DomainResult, User, UserRepository}; -/// Row type for database query results (shared between SQLite and PostgreSQL) -#[derive(Debug, FromRow)] -struct UserRow { - id: String, - subject: String, - email: String, - password_hash: Option, - created_at: String, -} - -impl TryFrom for User { - type Error = DomainError; - - fn try_from(row: UserRow) -> Result { - let id = Uuid::parse_str(&row.id) - .map_err(|e| DomainError::RepositoryError(format!("Invalid UUID: {}", e)))?; - let created_at = DateTime::parse_from_rfc3339(&row.created_at) - .map(|dt| dt.with_timezone(&Utc)) - .or_else(|_| { - // Fallback for SQLite datetime format - chrono::NaiveDateTime::parse_from_str(&row.created_at, "%Y-%m-%d %H:%M:%S") - .map(|dt| dt.and_utc()) - }) - .map_err(|e| DomainError::RepositoryError(format!("Invalid datetime: {}", e)))?; - - let email = Email::try_from(row.email) - .map_err(|e| DomainError::RepositoryError(format!("Invalid email in DB: {}", e)))?; - - Ok(User::with_id( - id, - row.subject, - email, - row.password_hash, - created_at, - )) - } -} +use super::mapping::UserRow; /// SQLite adapter for UserRepository -#[cfg(feature = "sqlite")] #[derive(Clone)] pub struct SqliteUserRepository { pool: sqlx::SqlitePool, } -#[cfg(feature = "sqlite")] impl SqliteUserRepository { pub fn new(pool: sqlx::SqlitePool) -> Self { Self { pool } } } -#[cfg(feature = "sqlite")] #[async_trait] impl UserRepository for SqliteUserRepository { async fn find_by_id(&self, id: Uuid) -> DomainResult> { @@ -145,9 +102,10 @@ impl UserRepository for SqliteUserRepository { } } -#[cfg(all(test, feature = "sqlite"))] +#[cfg(test)] mod tests { use super::*; + use domain::Email; use crate::db::run_migrations; use k_core::db::{DatabaseConfig, DatabasePool, connect}; @@ -223,102 +181,3 @@ mod tests { assert!(found.is_none()); } } - -/// PostgreSQL adapter for UserRepository -#[cfg(feature = "postgres")] -#[derive(Clone)] -pub struct PostgresUserRepository { - pool: sqlx::Pool, -} - -#[cfg(feature = "postgres")] -impl PostgresUserRepository { - pub fn new(pool: sqlx::Pool) -> Self { - Self { pool } - } -} - -#[cfg(feature = "postgres")] -#[async_trait] -impl UserRepository for PostgresUserRepository { - async fn find_by_id(&self, id: Uuid) -> DomainResult> { - let id_str = id.to_string(); - let row: Option = sqlx::query_as( - "SELECT id, subject, email, password_hash, created_at FROM users WHERE id = $1", - ) - .bind(&id_str) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - row.map(User::try_from).transpose() - } - - async fn find_by_subject(&self, subject: &str) -> DomainResult> { - let row: Option = sqlx::query_as( - "SELECT id, subject, email, password_hash, created_at FROM users WHERE subject = $1", - ) - .bind(subject) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - row.map(User::try_from).transpose() - } - - async fn find_by_email(&self, email: &str) -> DomainResult> { - let row: Option = sqlx::query_as( - "SELECT id, subject, email, password_hash, created_at FROM users WHERE email = $1", - ) - .bind(email) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - row.map(User::try_from).transpose() - } - - async fn save(&self, user: &User) -> DomainResult<()> { - let id = user.id.to_string(); - let created_at = user.created_at.to_rfc3339(); - - sqlx::query( - r#" - INSERT INTO users (id, subject, email, password_hash, created_at) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT(id) DO UPDATE SET - subject = excluded.subject, - email = excluded.email, - password_hash = excluded.password_hash - "#, - ) - .bind(&id) - .bind(&user.subject) - .bind(user.email.as_ref()) - .bind(&user.password_hash) - .bind(&created_at) - .execute(&self.pool) - .await - .map_err(|e| { - let msg = e.to_string(); - if msg.contains("unique constraint") || msg.contains("duplicate key") { - DomainError::UserAlreadyExists(user.email.as_ref().to_string()) - } else { - DomainError::RepositoryError(msg) - } - })?; - - Ok(()) - } - - async fn delete(&self, id: Uuid) -> DomainResult<()> { - let id_str = id.to_string(); - sqlx::query("DELETE FROM users WHERE id = $1") - .bind(&id_str) - .execute(&self.pool) - .await - .map_err(|e| DomainError::RepositoryError(e.to_string()))?; - - Ok(()) - } -}