From 1102e385f3c75829bba5fdb9c3256effcfcfe300 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 15 Mar 2026 00:34:23 +0100 Subject: [PATCH] feat(transcoding): add FFmpeg HLS transcoding support - Introduced `TranscodeManager` for managing on-demand transcoding of local video files. - Added configuration options for transcoding in `Config` and `LocalFilesConfig`. - Implemented new API routes for managing transcoding settings, stats, and cache. - Updated `LocalFilesProvider` to support transcoding capabilities. - Created frontend components for managing transcode settings and displaying stats. - Added database migration for transcode settings. - Enhanced existing routes and DTOs to accommodate new transcoding features. --- .gitignore | 1 + k-tv-backend/Cargo.lock | 1 + k-tv-backend/api/Cargo.toml | 3 +- k-tv-backend/api/src/config.rs | 13 + k-tv-backend/api/src/dto.rs | 20 ++ k-tv-backend/api/src/main.rs | 38 ++- k-tv-backend/api/src/routes/config.rs | 1 + k-tv-backend/api/src/routes/files.rs | 248 +++++++++++++++-- k-tv-backend/api/src/state.rs | 10 + k-tv-backend/domain/src/ports.rs | 2 + k-tv-backend/infra/src/jellyfin/provider.rs | 1 + k-tv-backend/infra/src/lib.rs | 2 +- k-tv-backend/infra/src/local_files/config.rs | 4 + k-tv-backend/infra/src/local_files/mod.rs | 2 + .../infra/src/local_files/provider.rs | 43 ++- .../infra/src/local_files/transcoder.rs | 254 ++++++++++++++++++ k-tv-backend/mcp/src/main.rs | 10 +- .../20260315000000_add_transcode_settings.sql | 5 + .../components/transcode-settings-dialog.tsx | 146 ++++++++++ k-tv-frontend/app/(main)/dashboard/page.tsx | 19 +- k-tv-frontend/hooks/use-transcode.ts | 43 +++ k-tv-frontend/lib/api.ts | 20 ++ k-tv-frontend/lib/types.ts | 10 + 23 files changed, 865 insertions(+), 31 deletions(-) create mode 100644 .gitignore create mode 100644 k-tv-backend/infra/src/local_files/transcoder.rs create mode 100644 k-tv-backend/migrations_sqlite/20260315000000_add_transcode_settings.sql create mode 100644 k-tv-frontend/app/(main)/dashboard/components/transcode-settings-dialog.tsx create mode 100644 k-tv-frontend/hooks/use-transcode.ts diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2564b08 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +transcode/ diff --git a/k-tv-backend/Cargo.lock b/k-tv-backend/Cargo.lock index 745e6b2..e419507 100644 --- a/k-tv-backend/Cargo.lock +++ b/k-tv-backend/Cargo.lock @@ -84,6 +84,7 @@ dependencies = [ "serde", "serde_json", "serde_qs", + "sqlx", "thiserror 2.0.17", "time", "tokio", diff --git a/k-tv-backend/api/Cargo.toml b/k-tv-backend/api/Cargo.toml index a8287ee..d114c93 100644 --- a/k-tv-backend/api/Cargo.toml +++ b/k-tv-backend/api/Cargo.toml @@ -11,7 +11,7 @@ postgres = ["infra/postgres"] auth-oidc = ["infra/auth-oidc"] auth-jwt = ["infra/auth-jwt"] jellyfin = ["infra/jellyfin"] -local-files = ["infra/local-files", "dep:tokio-util"] +local-files = ["infra/local-files", "dep:tokio-util", "dep:sqlx"] [profile.release] strip = true @@ -61,3 +61,4 @@ async-trait = "0.1" dotenvy = "0.15.7" time = "0.3" tokio-util = { version = "0.7", features = ["io"], optional = true } +sqlx = { version = "0.8.6", features = ["runtime-tokio", "sqlite"], optional = true } diff --git a/k-tv-backend/api/src/config.rs b/k-tv-backend/api/src/config.rs index f8051d7..0ed73c2 100644 --- a/k-tv-backend/api/src/config.rs +++ b/k-tv-backend/api/src/config.rs @@ -44,6 +44,11 @@ pub struct Config { /// Root directory for the local-files provider. Set `LOCAL_FILES_DIR` to enable. pub local_files_dir: Option, + /// Directory for FFmpeg HLS transcode cache. Set `TRANSCODE_DIR` to enable transcoding. + pub transcode_dir: Option, + /// How long (hours) to keep transcode cache entries before cleanup. Default 24. + pub transcode_cleanup_ttl_hours: u32, + /// Public base URL of this API server (used to build IPTV stream URLs). pub base_url: String, } @@ -120,6 +125,12 @@ impl Config { let local_files_dir = env::var("LOCAL_FILES_DIR").ok().map(PathBuf::from); + let transcode_dir = env::var("TRANSCODE_DIR").ok().map(PathBuf::from); + let transcode_cleanup_ttl_hours = env::var("TRANSCODE_CLEANUP_TTL_HOURS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(24); + let base_url = env::var("BASE_URL") .unwrap_or_else(|_| format!("http://localhost:{}", port)); @@ -147,6 +158,8 @@ impl Config { jellyfin_api_key, jellyfin_user_id, local_files_dir, + transcode_dir, + transcode_cleanup_ttl_hours, base_url, } } diff --git a/k-tv-backend/api/src/dto.rs b/k-tv-backend/api/src/dto.rs index 30aa8ac..845d541 100644 --- a/k-tv-backend/api/src/dto.rs +++ b/k-tv-backend/api/src/dto.rs @@ -235,6 +235,26 @@ pub struct ScheduleResponse { pub slots: Vec, } +// ============================================================================ +// Transcode DTOs +// ============================================================================ + +#[derive(Debug, Serialize)] +pub struct TranscodeSettingsResponse { + pub cleanup_ttl_hours: u32, +} + +#[derive(Debug, Deserialize)] +pub struct UpdateTranscodeSettingsRequest { + pub cleanup_ttl_hours: u32, +} + +#[derive(Debug, Serialize)] +pub struct TranscodeStatsResponse { + pub cache_size_bytes: u64, + pub item_count: usize, +} + impl From for ScheduleResponse { fn from(s: domain::GeneratedSchedule) -> Self { Self { diff --git a/k-tv-backend/api/src/main.rs b/k-tv-backend/api/src/main.rs index 57556ab..99fe37a 100644 --- a/k-tv-backend/api/src/main.rs +++ b/k-tv-backend/api/src/main.rs @@ -75,6 +75,10 @@ async fn main() -> anyhow::Result<()> { // Build provider registry — all configured providers are registered simultaneously. #[cfg(feature = "local-files")] let mut local_index: Option> = None; + #[cfg(feature = "local-files")] + let mut transcode_manager: Option> = None; + #[cfg(feature = "local-files")] + let mut sqlite_pool_for_state: Option = None; let mut registry = infra::ProviderRegistry::new(); @@ -99,12 +103,41 @@ async fn main() -> anyhow::Result<()> { let lf_cfg = infra::LocalFilesConfig { root_dir: dir.clone(), base_url: config.base_url.clone(), + transcode_dir: config.transcode_dir.clone(), + cleanup_ttl_hours: config.transcode_cleanup_ttl_hours, }; let idx = Arc::new(infra::LocalIndex::new(&lf_cfg, sqlite_pool.clone()).await); local_index = Some(Arc::clone(&idx)); let scan_idx = Arc::clone(&idx); tokio::spawn(async move { scan_idx.rescan().await; }); - registry.register("local", Arc::new(infra::LocalFilesProvider::new(idx, lf_cfg))); + + // Build TranscodeManager if TRANSCODE_DIR is set. + let tm = config.transcode_dir.as_ref().map(|td| { + std::fs::create_dir_all(td).ok(); + tracing::info!("Transcoding enabled; cache dir: {:?}", td); + let tm = infra::TranscodeManager::new(td.clone(), config.transcode_cleanup_ttl_hours); + // Load persisted TTL from DB. + let tm_clone = Arc::clone(&tm); + let pool_clone = sqlite_pool.clone(); + tokio::spawn(async move { + if let Ok(row) = sqlx::query_as::<_, (i64,)>( + "SELECT cleanup_ttl_hours FROM transcode_settings WHERE id = 1", + ) + .fetch_one(&pool_clone) + .await + { + tm_clone.set_cleanup_ttl(row.0 as u32); + } + }); + tm + }); + + registry.register( + "local", + Arc::new(infra::LocalFilesProvider::new(idx, lf_cfg, tm.clone())), + ); + transcode_manager = tm; + sqlite_pool_for_state = Some(sqlite_pool.clone()); } else { tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR"); } @@ -137,6 +170,8 @@ async fn main() -> anyhow::Result<()> { #[cfg(feature = "local-files")] { state.local_index = local_index; + state.transcode_manager = transcode_manager; + state.sqlite_pool = sqlite_pool_for_state; } let server_config = ServerConfig { @@ -206,6 +241,7 @@ impl IMediaProvider for NoopMediaProvider { search: false, streaming_protocol: StreamingProtocol::DirectFile, rescan: false, + transcode: false, } } diff --git a/k-tv-backend/api/src/routes/config.rs b/k-tv-backend/api/src/routes/config.rs index 0bed44c..aca3f04 100644 --- a/k-tv-backend/api/src/routes/config.rs +++ b/k-tv-backend/api/src/routes/config.rs @@ -33,6 +33,7 @@ async fn get_config(State(state): State) -> Json { search: false, streaming_protocol: StreamingProtocol::DirectFile, rescan: false, + transcode: false, }); Json(ConfigResponse { diff --git a/k-tv-backend/api/src/routes/files.rs b/k-tv-backend/api/src/routes/files.rs index e69ae74..90eaf64 100644 --- a/k-tv-backend/api/src/routes/files.rs +++ b/k-tv-backend/api/src/routes/files.rs @@ -1,30 +1,58 @@ -//! Local-file streaming and rescan routes +//! Local-file streaming, rescan, and transcode routes. //! -//! GET /files/stream/:encoded_id — serve a local video file with Range support -//! POST /files/rescan — trigger an index rebuild (auth required) +//! GET /files/stream/:id — Range streaming (no auth) +//! POST /files/rescan — index rebuild (auth required) +//! GET /files/transcode/:id/playlist.m3u8 — trigger transcode + serve playlist +//! GET /files/transcode/:id/:segment — serve .ts / .m3u8 segment +//! GET /files/transcode-settings — read TTL (auth) +//! PUT /files/transcode-settings — update TTL (auth) +//! GET /files/transcode-stats — cache size (auth) +//! DELETE /files/transcode-cache — clear cache (auth) use axum::{ Router, extract::{Path, State}, - http::{HeaderMap, StatusCode}, + http::HeaderMap, response::Response, - routing::{get, post}, + routing::get, }; -use crate::{error::ApiError, extractors::CurrentUser, state::AppState}; +use crate::{error::ApiError, state::AppState}; + +#[cfg(feature = "local-files")] +use axum::{ + Json, + http::StatusCode, + routing::{delete, post}, +}; +#[cfg(feature = "local-files")] +use serde::Deserialize; +#[cfg(feature = "local-files")] +use crate::{ + dto::{TranscodeSettingsResponse, TranscodeStatsResponse, UpdateTranscodeSettingsRequest}, + extractors::CurrentUser, +}; pub fn router() -> Router { let r = Router::new().route("/stream/{id}", get(stream_file)); #[cfg(feature = "local-files")] - let r = r.route("/rescan", post(trigger_rescan)); + let r = r + .route("/rescan", post(trigger_rescan)) + .route("/transcode/{id}/playlist.m3u8", get(transcode_playlist)) + .route("/transcode/{id}/{segment}", get(transcode_segment)) + .route( + "/transcode-settings", + get(get_transcode_settings).put(update_transcode_settings), + ) + .route("/transcode-stats", get(get_transcode_stats)) + .route("/transcode-cache", delete(clear_transcode_cache)); r } -/// Stream a local video file, honouring `Range` headers for seeking. -/// -/// The path segment is a base64url-encoded relative path produced by the -/// `LocalFilesProvider`. No authentication required — the ID is not guessable -/// without knowing the filesystem layout. +// ============================================================================ +// Direct streaming +// ============================================================================ + async fn stream_file( State(state): State, Path(encoded_id): Path, @@ -44,7 +72,6 @@ async fn stream_file( let rel = infra::local_files::decode_stream_id(&encoded_id) .ok_or_else(|| ApiError::validation("invalid stream id"))?; - // Security: canonicalise and verify the path stays inside root. let full_path = root_dir.join(&rel); let canonical_root = root_dir .canonicalize() @@ -72,7 +99,6 @@ async fn stream_file( .to_lowercase(); let content_type = content_type_for_ext(&ext); - // Parse Range header. let range = headers .get(axum::http::header::RANGE) .and_then(|v| v.to_str().ok()) @@ -112,20 +138,208 @@ async fn stream_file( Err(ApiError::not_implemented("local-files feature not enabled")) } -/// Trigger a filesystem rescan and return the number of items found. +// ============================================================================ +// Rescan +// ============================================================================ + #[cfg(feature = "local-files")] async fn trigger_rescan( State(state): State, CurrentUser(_user): CurrentUser, -) -> Result, ApiError> { +) -> Result, ApiError> { let index = state .local_index .as_ref() .ok_or_else(|| ApiError::not_implemented("no local files provider active"))?; let count = index.rescan().await; - Ok(axum::Json(serde_json::json!({ "items_found": count }))) + Ok(Json(serde_json::json!({ "items_found": count }))) } +// ============================================================================ +// Transcode endpoints +// ============================================================================ + +#[cfg(feature = "local-files")] +async fn transcode_playlist( + State(state): State, + Path(id): Path, +) -> Result { + let tm = state + .transcode_manager + .as_ref() + .ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?; + + let root = state.config.local_files_dir.as_ref().ok_or_else(|| { + ApiError::not_implemented("LOCAL_FILES_DIR not configured") + })?; + + let rel = infra::local_files::decode_stream_id(&id) + .ok_or_else(|| ApiError::validation("invalid item id"))?; + let src = root.join(&rel); + + tm.ensure_transcoded(&id, &src) + .await + .map_err(|e| ApiError::internal(e.to_string()))?; + + let playlist_path = tm.transcode_dir.join(&id).join("playlist.m3u8"); + let content = tokio::fs::read_to_string(&playlist_path) + .await + .map_err(|e| ApiError::internal(e.to_string()))?; + + Response::builder() + .status(200) + .header("Content-Type", "application/vnd.apple.mpegurl") + .header("Cache-Control", "no-cache") + .body(axum::body::Body::from(content)) + .map_err(|e| ApiError::internal(e.to_string())) +} + +#[derive(Deserialize)] +#[cfg(feature = "local-files")] +struct TranscodeSegmentPath { + id: String, + segment: String, +} + +#[cfg(feature = "local-files")] +async fn transcode_segment( + State(state): State, + Path(params): Path, +) -> Result { + let TranscodeSegmentPath { id, segment } = params; + + let ext = std::path::Path::new(&segment) + .extension() + .and_then(|e| e.to_str()) + .unwrap_or(""); + if ext != "ts" && ext != "m3u8" { + return Err(ApiError::not_found("invalid segment extension")); + } + if segment.contains('/') || segment.contains("..") { + return Err(ApiError::Forbidden("invalid segment path".into())); + } + + let tm = state + .transcode_manager + .as_ref() + .ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?; + + let file_path = tm.transcode_dir.join(&id).join(&segment); + let canonical_base = tm + .transcode_dir + .canonicalize() + .map_err(|e| ApiError::internal(e.to_string()))?; + let canonical_file = file_path + .canonicalize() + .map_err(|_| ApiError::not_found("segment not found"))?; + if !canonical_file.starts_with(&canonical_base) { + return Err(ApiError::Forbidden("path traversal detected".into())); + } + + let content = tokio::fs::read(&canonical_file) + .await + .map_err(|_| ApiError::not_found("segment not found"))?; + + let content_type = if ext == "ts" { + "video/mp2t" + } else { + "application/vnd.apple.mpegurl" + }; + + Response::builder() + .status(200) + .header("Content-Type", content_type) + .body(axum::body::Body::from(content)) + .map_err(|e| ApiError::internal(e.to_string())) +} + +// ============================================================================ +// Transcode settings / stats / cache management +// ============================================================================ + +#[cfg(feature = "local-files")] +async fn get_transcode_settings( + State(state): State, + CurrentUser(_user): CurrentUser, +) -> Result, ApiError> { + let pool = state + .sqlite_pool + .as_ref() + .ok_or_else(|| ApiError::not_implemented("sqlite not available"))?; + + let (ttl,): (i64,) = + sqlx::query_as("SELECT cleanup_ttl_hours FROM transcode_settings WHERE id = 1") + .fetch_one(pool) + .await + .map_err(|e| ApiError::internal(e.to_string()))?; + + Ok(Json(TranscodeSettingsResponse { + cleanup_ttl_hours: ttl as u32, + })) +} + +#[cfg(feature = "local-files")] +async fn update_transcode_settings( + State(state): State, + CurrentUser(_user): CurrentUser, + Json(req): Json, +) -> Result, ApiError> { + let pool = state + .sqlite_pool + .as_ref() + .ok_or_else(|| ApiError::not_implemented("sqlite not available"))?; + + let ttl = req.cleanup_ttl_hours as i64; + sqlx::query("UPDATE transcode_settings SET cleanup_ttl_hours = ? WHERE id = 1") + .bind(ttl) + .execute(pool) + .await + .map_err(|e| ApiError::internal(e.to_string()))?; + + if let Some(tm) = &state.transcode_manager { + tm.set_cleanup_ttl(req.cleanup_ttl_hours); + } + + Ok(Json(TranscodeSettingsResponse { + cleanup_ttl_hours: req.cleanup_ttl_hours, + })) +} + +#[cfg(feature = "local-files")] +async fn get_transcode_stats( + State(state): State, + CurrentUser(_user): CurrentUser, +) -> Result, ApiError> { + let tm = state + .transcode_manager + .as_ref() + .ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?; + let (cache_size_bytes, item_count) = tm.cache_stats().await; + Ok(Json(TranscodeStatsResponse { + cache_size_bytes, + item_count, + })) +} + +#[cfg(feature = "local-files")] +async fn clear_transcode_cache( + State(state): State, + CurrentUser(_user): CurrentUser, +) -> Result { + let tm = state + .transcode_manager + .as_ref() + .ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?; + tm.clear_cache() + .await + .map_err(|e| ApiError::internal(e.to_string()))?; + Ok(StatusCode::NO_CONTENT) +} + +// ============================================================================ +// Helpers +// ============================================================================ + fn content_type_for_ext(ext: &str) -> &'static str { match ext { "mp4" | "m4v" => "video/mp4", diff --git a/k-tv-backend/api/src/state.rs b/k-tv-backend/api/src/state.rs index 224c624..8298a62 100644 --- a/k-tv-backend/api/src/state.rs +++ b/k-tv-backend/api/src/state.rs @@ -28,6 +28,12 @@ pub struct AppState { /// Index for the local-files provider, used by the rescan route. #[cfg(feature = "local-files")] pub local_index: Option>, + /// TranscodeManager for FFmpeg HLS transcoding (requires TRANSCODE_DIR). + #[cfg(feature = "local-files")] + pub transcode_manager: Option>, + /// SQLite pool for transcode settings CRUD. + #[cfg(feature = "local-files")] + pub sqlite_pool: Option, } impl AppState { @@ -110,6 +116,10 @@ impl AppState { config: Arc::new(config), #[cfg(feature = "local-files")] local_index: None, + #[cfg(feature = "local-files")] + transcode_manager: None, + #[cfg(feature = "local-files")] + sqlite_pool: None, }) } } diff --git a/k-tv-backend/domain/src/ports.rs b/k-tv-backend/domain/src/ports.rs index 9bd9a5f..1a7f568 100644 --- a/k-tv-backend/domain/src/ports.rs +++ b/k-tv-backend/domain/src/ports.rs @@ -54,6 +54,8 @@ pub struct ProviderCapabilities { pub streaming_protocol: StreamingProtocol, /// Whether `POST /files/rescan` is available. pub rescan: bool, + /// Whether on-demand FFmpeg transcoding to HLS is available. + pub transcode: bool, } // ============================================================================ diff --git a/k-tv-backend/infra/src/jellyfin/provider.rs b/k-tv-backend/infra/src/jellyfin/provider.rs index ec20a7f..74d2ede 100644 --- a/k-tv-backend/infra/src/jellyfin/provider.rs +++ b/k-tv-backend/infra/src/jellyfin/provider.rs @@ -139,6 +139,7 @@ impl IMediaProvider for JellyfinMediaProvider { search: true, streaming_protocol: StreamingProtocol::Hls, rescan: false, + transcode: false, } } diff --git a/k-tv-backend/infra/src/lib.rs b/k-tv-backend/infra/src/lib.rs index 61aaf2d..c8e1125 100644 --- a/k-tv-backend/infra/src/lib.rs +++ b/k-tv-backend/infra/src/lib.rs @@ -40,4 +40,4 @@ pub use schedule_repository::SqliteScheduleRepository; pub use jellyfin::{JellyfinConfig, JellyfinMediaProvider}; #[cfg(feature = "local-files")] -pub use local_files::{LocalFilesConfig, LocalFilesProvider, LocalIndex, decode_stream_id}; +pub use local_files::{LocalFilesConfig, LocalFilesProvider, LocalIndex, TranscodeManager, decode_stream_id}; diff --git a/k-tv-backend/infra/src/local_files/config.rs b/k-tv-backend/infra/src/local_files/config.rs index 0246ba3..f34e746 100644 --- a/k-tv-backend/infra/src/local_files/config.rs +++ b/k-tv-backend/infra/src/local_files/config.rs @@ -6,4 +6,8 @@ pub struct LocalFilesConfig { pub root_dir: PathBuf, /// Public base URL of this API server, used to build stream URLs. pub base_url: String, + /// Directory for FFmpeg HLS transcode cache. `None` disables transcoding. + pub transcode_dir: Option, + /// How long (hours) to keep transcode cache entries. Passed to TranscodeManager. + pub cleanup_ttl_hours: u32, } diff --git a/k-tv-backend/infra/src/local_files/mod.rs b/k-tv-backend/infra/src/local_files/mod.rs index 1e12d66..0cce7cc 100644 --- a/k-tv-backend/infra/src/local_files/mod.rs +++ b/k-tv-backend/infra/src/local_files/mod.rs @@ -2,7 +2,9 @@ pub mod config; pub mod index; pub mod provider; pub mod scanner; +pub mod transcoder; pub use config::LocalFilesConfig; pub use index::LocalIndex; pub use provider::{LocalFilesProvider, decode_stream_id}; +pub use transcoder::TranscodeManager; diff --git a/k-tv-backend/infra/src/local_files/provider.rs b/k-tv-backend/infra/src/local_files/provider.rs index df85f87..89c53aa 100644 --- a/k-tv-backend/infra/src/local_files/provider.rs +++ b/k-tv-backend/infra/src/local_files/provider.rs @@ -9,19 +9,26 @@ use domain::{ use super::config::LocalFilesConfig; use super::index::{LocalIndex, decode_id}; use super::scanner::LocalFileItem; +use super::transcoder::TranscodeManager; pub struct LocalFilesProvider { pub index: Arc, base_url: String, + transcode_manager: Option>, } const SHORT_DURATION_SECS: u32 = 1200; // 20 minutes impl LocalFilesProvider { - pub fn new(index: Arc, config: LocalFilesConfig) -> Self { + pub fn new( + index: Arc, + config: LocalFilesConfig, + transcode_manager: Option>, + ) -> Self { Self { index, base_url: config.base_url.trim_end_matches('/').to_string(), + transcode_manager, } } } @@ -57,8 +64,13 @@ impl IMediaProvider for LocalFilesProvider { tags: true, decade: true, search: true, - streaming_protocol: StreamingProtocol::DirectFile, + streaming_protocol: if self.transcode_manager.is_some() { + StreamingProtocol::Hls + } else { + StreamingProtocol::DirectFile + }, rescan: true, + transcode: self.transcode_manager.is_some(), } } @@ -138,12 +150,27 @@ impl IMediaProvider for LocalFilesProvider { .map(|item| to_media_item(item_id.clone(), &item))) } - async fn get_stream_url(&self, item_id: &MediaItemId, _quality: &StreamQuality) -> DomainResult { - Ok(format!( - "{}/api/v1/files/stream/{}", - self.base_url, - item_id.as_ref() - )) + async fn get_stream_url(&self, item_id: &MediaItemId, quality: &StreamQuality) -> DomainResult { + match quality { + StreamQuality::Transcode(_) if self.transcode_manager.is_some() => { + let tm = self.transcode_manager.as_ref().unwrap(); + let rel = decode_id(item_id).ok_or_else(|| { + DomainError::InfrastructureError("invalid item id encoding".into()) + })?; + let src = self.index.root_dir.join(&rel); + tm.ensure_transcoded(item_id.as_ref(), &src).await?; + Ok(format!( + "{}/api/v1/files/transcode/{}/playlist.m3u8", + self.base_url, + item_id.as_ref() + )) + } + _ => Ok(format!( + "{}/api/v1/files/stream/{}", + self.base_url, + item_id.as_ref() + )), + } } async fn list_collections(&self) -> DomainResult> { diff --git a/k-tv-backend/infra/src/local_files/transcoder.rs b/k-tv-backend/infra/src/local_files/transcoder.rs new file mode 100644 index 0000000..7286367 --- /dev/null +++ b/k-tv-backend/infra/src/local_files/transcoder.rs @@ -0,0 +1,254 @@ +//! FFmpeg HLS transcoder for local video files. +//! +//! `TranscodeManager` orchestrates on-demand transcoding: the first request for +//! an item spawns an ffmpeg process and returns once the initial HLS playlist +//! appears. Concurrent requests for the same item subscribe to a watch channel +//! and wait without spawning duplicate processes. Transcoded segments are cached +//! in `transcode_dir/{item_id}/` and cleaned up by a background task. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::{ + Arc, + atomic::{AtomicU32, Ordering}, +}; +use std::time::{Duration, Instant}; + +use tokio::sync::{Mutex, watch}; +use tracing::{info, warn, error}; + +use domain::{DomainError, DomainResult}; + +// ============================================================================ +// Types +// ============================================================================ + +#[derive(Clone, Debug)] +pub enum TranscodeStatus { + Ready, + Failed(String), +} + +// ============================================================================ +// Manager +// ============================================================================ + +pub struct TranscodeManager { + pub transcode_dir: PathBuf, + cleanup_ttl_hours: Arc, + active: Arc>>>>, +} + +impl TranscodeManager { + pub fn new(transcode_dir: PathBuf, cleanup_ttl_hours: u32) -> Arc { + let mgr = Arc::new(Self { + transcode_dir, + cleanup_ttl_hours: Arc::new(AtomicU32::new(cleanup_ttl_hours)), + active: Arc::new(Mutex::new(HashMap::new())), + }); + // Background cleanup task — uses Weak to avoid keeping manager alive. + let weak = Arc::downgrade(&mgr); + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(3600)); + loop { + interval.tick().await; + match weak.upgrade() { + Some(m) => m.run_cleanup().await, + None => break, + } + } + }); + mgr + } + + /// Update the cleanup TTL (also persisted to DB by the route handler). + pub fn set_cleanup_ttl(&self, hours: u32) { + self.cleanup_ttl_hours.store(hours, Ordering::Relaxed); + } + + pub fn get_cleanup_ttl(&self) -> u32 { + self.cleanup_ttl_hours.load(Ordering::Relaxed) + } + + /// Ensure `item_id` has been transcoded to HLS. Blocks until the initial + /// playlist appears or an error occurs. Concurrent callers share the result. + pub async fn ensure_transcoded(&self, item_id: &str, src_path: &Path) -> DomainResult<()> { + let out_dir = self.transcode_dir.join(item_id); + let playlist = out_dir.join("playlist.m3u8"); + + if playlist.exists() { + return Ok(()); + } + + let mut rx = { + let mut map = self.active.lock().await; + if let Some(tx) = map.get(item_id) { + tx.subscribe() + } else { + let (tx, rx) = watch::channel::>(None); + map.insert(item_id.to_string(), tx.clone()); + + let item_id_owned = item_id.to_string(); + let src_owned = src_path.to_path_buf(); + let out_dir_owned = out_dir.clone(); + let playlist_owned = playlist.clone(); + let active_ref = Arc::clone(&self.active); + + tokio::spawn(async move { + let _ = tokio::fs::create_dir_all(&out_dir_owned).await; + let status = do_transcode(&src_owned, &out_dir_owned, &playlist_owned).await; + if matches!(status, TranscodeStatus::Ready) { + info!("transcode ready: {}", item_id_owned); + } else if let TranscodeStatus::Failed(ref e) = status { + error!("transcode failed for {}: {}", item_id_owned, e); + } + let _ = tx.send(Some(status)); + active_ref.lock().await.remove(&item_id_owned); + }); + + rx + } + }; + + // Wait for Ready or Failed. + loop { + rx.changed().await.map_err(|_| { + DomainError::InfrastructureError("transcode task dropped unexpectedly".into()) + })?; + if let Some(status) = &*rx.borrow() { + return match status { + TranscodeStatus::Ready => Ok(()), + TranscodeStatus::Failed(e) => Err(DomainError::InfrastructureError( + format!("transcode failed: {}", e), + )), + }; + } + } + } + + /// Remove all cached transcode directories. + pub async fn clear_cache(&self) -> std::io::Result<()> { + if self.transcode_dir.exists() { + tokio::fs::remove_dir_all(&self.transcode_dir).await?; + } + tokio::fs::create_dir_all(&self.transcode_dir).await + } + + /// Return `(total_bytes, item_count)` for the cache directory. + pub async fn cache_stats(&self) -> (u64, usize) { + let mut total_bytes = 0u64; + let mut item_count = 0usize; + let Ok(mut entries) = tokio::fs::read_dir(&self.transcode_dir).await else { + return (0, 0); + }; + while let Ok(Some(entry)) = entries.next_entry().await { + if !entry.path().is_dir() { + continue; + } + item_count += 1; + if let Ok(mut sub) = tokio::fs::read_dir(entry.path()).await { + while let Ok(Some(f)) = sub.next_entry().await { + if let Ok(meta) = f.metadata().await { + total_bytes += meta.len(); + } + } + } + } + (total_bytes, item_count) + } + + async fn run_cleanup(&self) { + let ttl_hours = self.cleanup_ttl_hours.load(Ordering::Relaxed) as u64; + let ttl = Duration::from_secs(ttl_hours * 3600); + let now = std::time::SystemTime::now(); + + let Ok(mut entries) = tokio::fs::read_dir(&self.transcode_dir).await else { + return; + }; + while let Ok(Some(entry)) = entries.next_entry().await { + let path = entry.path(); + if !path.is_dir() { + continue; + } + let playlist = path.join("playlist.m3u8"); + if let Ok(meta) = tokio::fs::metadata(&playlist).await { + if let Ok(modified) = meta.modified() { + if let Ok(age) = now.duration_since(modified) { + if age > ttl { + warn!("cleanup: removing stale transcode {:?}", path); + let _ = tokio::fs::remove_dir_all(&path).await; + } + } + } + } + } + } +} + +// ============================================================================ +// FFmpeg helper +// ============================================================================ + +async fn do_transcode(src: &Path, out_dir: &Path, playlist: &Path) -> TranscodeStatus { + let segment_pattern = out_dir.join("seg%05d.ts"); + + let mut child = match tokio::process::Command::new("ffmpeg") + .args([ + "-i", + src.to_str().unwrap_or(""), + "-c:v", + "libx264", + "-preset", + "fast", + "-crf", + "23", + "-c:a", + "aac", + "-b:a", + "128k", + "-hls_time", + "6", + "-hls_list_size", + "0", + "-hls_flags", + "independent_segments", + "-hls_segment_filename", + segment_pattern.to_str().unwrap_or(""), + playlist.to_str().unwrap_or(""), + ]) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .spawn() + { + Ok(c) => c, + Err(e) => return TranscodeStatus::Failed(format!("ffmpeg spawn error: {}", e)), + }; + + // Poll for playlist.m3u8 — it appears after the first segment is written, + // allowing the client to start playback before transcoding is complete. + let start = Instant::now(); + let timeout = Duration::from_secs(60); + loop { + if playlist.exists() { + return TranscodeStatus::Ready; + } + if start.elapsed() > timeout { + let _ = child.kill().await; + return TranscodeStatus::Failed("timeout waiting for transcode to start".into()); + } + match child.try_wait() { + Ok(Some(status)) => { + return if playlist.exists() { + TranscodeStatus::Ready + } else if status.success() { + TranscodeStatus::Failed("ffmpeg exited but produced no playlist".into()) + } else { + TranscodeStatus::Failed("ffmpeg exited with non-zero status".into()) + }; + } + Err(e) => return TranscodeStatus::Failed(e.to_string()), + Ok(None) => {} + } + tokio::time::sleep(Duration::from_millis(100)).await; + } +} diff --git a/k-tv-backend/mcp/src/main.rs b/k-tv-backend/mcp/src/main.rs index c271413..0fd7f1d 100644 --- a/k-tv-backend/mcp/src/main.rs +++ b/k-tv-backend/mcp/src/main.rs @@ -90,11 +90,16 @@ async fn main() -> anyhow::Result<()> { if let k_core::db::DatabasePool::Sqlite(ref sqlite_pool) = db_pool { let base_url = std::env::var("BASE_URL") .unwrap_or_else(|_| "http://localhost:3000".to_string()); - let lf_cfg = infra::LocalFilesConfig { root_dir: dir, base_url }; + let lf_cfg = infra::LocalFilesConfig { + root_dir: dir, + base_url, + transcode_dir: None, + cleanup_ttl_hours: 24, + }; let idx = Arc::new(infra::LocalIndex::new(&lf_cfg, sqlite_pool.clone()).await); let scan_idx = Arc::clone(&idx); tokio::spawn(async move { scan_idx.rescan().await; }); - registry.register("local", Arc::new(infra::LocalFilesProvider::new(idx, lf_cfg))); + registry.register("local", Arc::new(infra::LocalFilesProvider::new(idx, lf_cfg, None))); } } @@ -145,6 +150,7 @@ impl IMediaProvider for NoopMediaProvider { search: false, streaming_protocol: StreamingProtocol::DirectFile, rescan: false, + transcode: false, } } diff --git a/k-tv-backend/migrations_sqlite/20260315000000_add_transcode_settings.sql b/k-tv-backend/migrations_sqlite/20260315000000_add_transcode_settings.sql new file mode 100644 index 0000000..b00a1de --- /dev/null +++ b/k-tv-backend/migrations_sqlite/20260315000000_add_transcode_settings.sql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS transcode_settings ( + id INTEGER PRIMARY KEY CHECK (id = 1), + cleanup_ttl_hours INTEGER NOT NULL DEFAULT 24 +); +INSERT OR IGNORE INTO transcode_settings (id, cleanup_ttl_hours) VALUES (1, 24); diff --git a/k-tv-frontend/app/(main)/dashboard/components/transcode-settings-dialog.tsx b/k-tv-frontend/app/(main)/dashboard/components/transcode-settings-dialog.tsx new file mode 100644 index 0000000..07b2ab2 --- /dev/null +++ b/k-tv-frontend/app/(main)/dashboard/components/transcode-settings-dialog.tsx @@ -0,0 +1,146 @@ +"use client"; + +import { useState, useEffect } from "react"; +import { Trash2 } from "lucide-react"; +import { Button } from "@/components/ui/button"; +import { + Dialog, + DialogContent, + DialogHeader, + DialogTitle, + DialogFooter, +} from "@/components/ui/dialog"; +import { Input } from "@/components/ui/input"; +import { Label } from "@/components/ui/label"; +import { + useTranscodeSettings, + useUpdateTranscodeSettings, + useTranscodeStats, + useClearTranscodeCache, +} from "@/hooks/use-transcode"; +import { toast } from "sonner"; + +interface Props { + open: boolean; + onOpenChange: (open: boolean) => void; +} + +function fmtBytes(bytes: number): string { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`; + if (bytes < 1024 * 1024 * 1024) + return `${(bytes / (1024 * 1024)).toFixed(1)} MB`; + return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)} GB`; +} + +export function TranscodeSettingsDialog({ open, onOpenChange }: Props) { + const { data: settings } = useTranscodeSettings(); + const { data: stats } = useTranscodeStats(); + const updateSettings = useUpdateTranscodeSettings(); + const clearCache = useClearTranscodeCache(); + + const [ttl, setTtl] = useState(24); + const [confirmClear, setConfirmClear] = useState(false); + + useEffect(() => { + if (settings) setTtl(settings.cleanup_ttl_hours); + }, [settings]); + + const handleSave = () => { + updateSettings.mutate( + { cleanup_ttl_hours: ttl }, + { + onSuccess: () => toast.success("Settings saved"), + onError: () => toast.error("Failed to save settings"), + }, + ); + }; + + const handleClear = () => { + if (!confirmClear) { + setConfirmClear(true); + return; + } + clearCache.mutate(undefined, { + onSuccess: () => { + toast.success("Transcode cache cleared"); + setConfirmClear(false); + }, + onError: () => toast.error("Failed to clear cache"), + }); + }; + + return ( + + + + Transcode Settings + + +
+ {/* TTL */} +
+ + setTtl(Number(e.target.value))} + className="w-32 bg-zinc-800 border-zinc-700 text-zinc-100" + /> +

+ Transcoded segments older than this are removed automatically. +

+
+ + {/* Stats */} +
+

Cache

+

+ {stats ? fmtBytes(stats.cache_size_bytes) : "—"}{" "} + + ({stats ? stats.item_count : "—"} items) + +

+
+ + {/* Clear cache */} + + {confirmClear && ( +

setConfirmClear(false)} + > + Cancel +

+ )} +
+ + + + + +
+
+ ); +} diff --git a/k-tv-frontend/app/(main)/dashboard/page.tsx b/k-tv-frontend/app/(main)/dashboard/page.tsx index a976540..6a4de7d 100644 --- a/k-tv-frontend/app/(main)/dashboard/page.tsx +++ b/k-tv-frontend/app/(main)/dashboard/page.tsx @@ -1,7 +1,7 @@ "use client"; import { useState, useEffect } from "react"; -import { Plus, Upload, RefreshCw, Antenna } from "lucide-react"; +import { Plus, Upload, RefreshCw, Antenna, Settings2 } from "lucide-react"; import { Button } from "@/components/ui/button"; import { useChannels, @@ -26,6 +26,7 @@ import { type ChannelImportData, } from "./components/import-channel-dialog"; import { IptvExportDialog } from "./components/iptv-export-dialog"; +import { TranscodeSettingsDialog } from "./components/transcode-settings-dialog"; import type { ChannelResponse, ProgrammingBlock, @@ -112,6 +113,7 @@ export default function DashboardPage() { }; const [iptvOpen, setIptvOpen] = useState(false); + const [transcodeOpen, setTranscodeOpen] = useState(false); const [createOpen, setCreateOpen] = useState(false); const [importOpen, setImportOpen] = useState(false); const [importPending, setImportPending] = useState(false); @@ -231,6 +233,16 @@ export default function DashboardPage() {

+ {config?.providers?.some((p) => p.capabilities.transcode) && ( + + )} {capabilities?.rescan && (