diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..2564b08
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+transcode/
diff --git a/k-tv-backend/Cargo.lock b/k-tv-backend/Cargo.lock
index 745e6b2..e419507 100644
--- a/k-tv-backend/Cargo.lock
+++ b/k-tv-backend/Cargo.lock
@@ -84,6 +84,7 @@ dependencies = [
"serde",
"serde_json",
"serde_qs",
+ "sqlx",
"thiserror 2.0.17",
"time",
"tokio",
diff --git a/k-tv-backend/api/Cargo.toml b/k-tv-backend/api/Cargo.toml
index a8287ee..d114c93 100644
--- a/k-tv-backend/api/Cargo.toml
+++ b/k-tv-backend/api/Cargo.toml
@@ -11,7 +11,7 @@ postgres = ["infra/postgres"]
auth-oidc = ["infra/auth-oidc"]
auth-jwt = ["infra/auth-jwt"]
jellyfin = ["infra/jellyfin"]
-local-files = ["infra/local-files", "dep:tokio-util"]
+local-files = ["infra/local-files", "dep:tokio-util", "dep:sqlx"]
[profile.release]
strip = true
@@ -61,3 +61,4 @@ async-trait = "0.1"
dotenvy = "0.15.7"
time = "0.3"
tokio-util = { version = "0.7", features = ["io"], optional = true }
+sqlx = { version = "0.8.6", features = ["runtime-tokio", "sqlite"], optional = true }
diff --git a/k-tv-backend/api/src/config.rs b/k-tv-backend/api/src/config.rs
index f8051d7..0ed73c2 100644
--- a/k-tv-backend/api/src/config.rs
+++ b/k-tv-backend/api/src/config.rs
@@ -44,6 +44,11 @@ pub struct Config {
/// Root directory for the local-files provider. Set `LOCAL_FILES_DIR` to enable.
pub local_files_dir: Option,
+ /// Directory for FFmpeg HLS transcode cache. Set `TRANSCODE_DIR` to enable transcoding.
+ pub transcode_dir: Option,
+ /// How long (hours) to keep transcode cache entries before cleanup. Default 24.
+ pub transcode_cleanup_ttl_hours: u32,
+
/// Public base URL of this API server (used to build IPTV stream URLs).
pub base_url: String,
}
@@ -120,6 +125,12 @@ impl Config {
let local_files_dir = env::var("LOCAL_FILES_DIR").ok().map(PathBuf::from);
+ let transcode_dir = env::var("TRANSCODE_DIR").ok().map(PathBuf::from);
+ let transcode_cleanup_ttl_hours = env::var("TRANSCODE_CLEANUP_TTL_HOURS")
+ .ok()
+ .and_then(|s| s.parse().ok())
+ .unwrap_or(24);
+
let base_url = env::var("BASE_URL")
.unwrap_or_else(|_| format!("http://localhost:{}", port));
@@ -147,6 +158,8 @@ impl Config {
jellyfin_api_key,
jellyfin_user_id,
local_files_dir,
+ transcode_dir,
+ transcode_cleanup_ttl_hours,
base_url,
}
}
diff --git a/k-tv-backend/api/src/dto.rs b/k-tv-backend/api/src/dto.rs
index 30aa8ac..845d541 100644
--- a/k-tv-backend/api/src/dto.rs
+++ b/k-tv-backend/api/src/dto.rs
@@ -235,6 +235,26 @@ pub struct ScheduleResponse {
pub slots: Vec,
}
+// ============================================================================
+// Transcode DTOs
+// ============================================================================
+
+#[derive(Debug, Serialize)]
+pub struct TranscodeSettingsResponse {
+ pub cleanup_ttl_hours: u32,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct UpdateTranscodeSettingsRequest {
+ pub cleanup_ttl_hours: u32,
+}
+
+#[derive(Debug, Serialize)]
+pub struct TranscodeStatsResponse {
+ pub cache_size_bytes: u64,
+ pub item_count: usize,
+}
+
impl From for ScheduleResponse {
fn from(s: domain::GeneratedSchedule) -> Self {
Self {
diff --git a/k-tv-backend/api/src/main.rs b/k-tv-backend/api/src/main.rs
index 57556ab..99fe37a 100644
--- a/k-tv-backend/api/src/main.rs
+++ b/k-tv-backend/api/src/main.rs
@@ -75,6 +75,10 @@ async fn main() -> anyhow::Result<()> {
// Build provider registry — all configured providers are registered simultaneously.
#[cfg(feature = "local-files")]
let mut local_index: Option> = None;
+ #[cfg(feature = "local-files")]
+ let mut transcode_manager: Option> = None;
+ #[cfg(feature = "local-files")]
+ let mut sqlite_pool_for_state: Option = None;
let mut registry = infra::ProviderRegistry::new();
@@ -99,12 +103,41 @@ async fn main() -> anyhow::Result<()> {
let lf_cfg = infra::LocalFilesConfig {
root_dir: dir.clone(),
base_url: config.base_url.clone(),
+ transcode_dir: config.transcode_dir.clone(),
+ cleanup_ttl_hours: config.transcode_cleanup_ttl_hours,
};
let idx = Arc::new(infra::LocalIndex::new(&lf_cfg, sqlite_pool.clone()).await);
local_index = Some(Arc::clone(&idx));
let scan_idx = Arc::clone(&idx);
tokio::spawn(async move { scan_idx.rescan().await; });
- registry.register("local", Arc::new(infra::LocalFilesProvider::new(idx, lf_cfg)));
+
+ // Build TranscodeManager if TRANSCODE_DIR is set.
+ let tm = config.transcode_dir.as_ref().map(|td| {
+ std::fs::create_dir_all(td).ok();
+ tracing::info!("Transcoding enabled; cache dir: {:?}", td);
+ let tm = infra::TranscodeManager::new(td.clone(), config.transcode_cleanup_ttl_hours);
+ // Load persisted TTL from DB.
+ let tm_clone = Arc::clone(&tm);
+ let pool_clone = sqlite_pool.clone();
+ tokio::spawn(async move {
+ if let Ok(row) = sqlx::query_as::<_, (i64,)>(
+ "SELECT cleanup_ttl_hours FROM transcode_settings WHERE id = 1",
+ )
+ .fetch_one(&pool_clone)
+ .await
+ {
+ tm_clone.set_cleanup_ttl(row.0 as u32);
+ }
+ });
+ tm
+ });
+
+ registry.register(
+ "local",
+ Arc::new(infra::LocalFilesProvider::new(idx, lf_cfg, tm.clone())),
+ );
+ transcode_manager = tm;
+ sqlite_pool_for_state = Some(sqlite_pool.clone());
} else {
tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR");
}
@@ -137,6 +170,8 @@ async fn main() -> anyhow::Result<()> {
#[cfg(feature = "local-files")]
{
state.local_index = local_index;
+ state.transcode_manager = transcode_manager;
+ state.sqlite_pool = sqlite_pool_for_state;
}
let server_config = ServerConfig {
@@ -206,6 +241,7 @@ impl IMediaProvider for NoopMediaProvider {
search: false,
streaming_protocol: StreamingProtocol::DirectFile,
rescan: false,
+ transcode: false,
}
}
diff --git a/k-tv-backend/api/src/routes/config.rs b/k-tv-backend/api/src/routes/config.rs
index 0bed44c..aca3f04 100644
--- a/k-tv-backend/api/src/routes/config.rs
+++ b/k-tv-backend/api/src/routes/config.rs
@@ -33,6 +33,7 @@ async fn get_config(State(state): State) -> Json {
search: false,
streaming_protocol: StreamingProtocol::DirectFile,
rescan: false,
+ transcode: false,
});
Json(ConfigResponse {
diff --git a/k-tv-backend/api/src/routes/files.rs b/k-tv-backend/api/src/routes/files.rs
index e69ae74..90eaf64 100644
--- a/k-tv-backend/api/src/routes/files.rs
+++ b/k-tv-backend/api/src/routes/files.rs
@@ -1,30 +1,58 @@
-//! Local-file streaming and rescan routes
+//! Local-file streaming, rescan, and transcode routes.
//!
-//! GET /files/stream/:encoded_id — serve a local video file with Range support
-//! POST /files/rescan — trigger an index rebuild (auth required)
+//! GET /files/stream/:id — Range streaming (no auth)
+//! POST /files/rescan — index rebuild (auth required)
+//! GET /files/transcode/:id/playlist.m3u8 — trigger transcode + serve playlist
+//! GET /files/transcode/:id/:segment — serve .ts / .m3u8 segment
+//! GET /files/transcode-settings — read TTL (auth)
+//! PUT /files/transcode-settings — update TTL (auth)
+//! GET /files/transcode-stats — cache size (auth)
+//! DELETE /files/transcode-cache — clear cache (auth)
use axum::{
Router,
extract::{Path, State},
- http::{HeaderMap, StatusCode},
+ http::HeaderMap,
response::Response,
- routing::{get, post},
+ routing::get,
};
-use crate::{error::ApiError, extractors::CurrentUser, state::AppState};
+use crate::{error::ApiError, state::AppState};
+
+#[cfg(feature = "local-files")]
+use axum::{
+ Json,
+ http::StatusCode,
+ routing::{delete, post},
+};
+#[cfg(feature = "local-files")]
+use serde::Deserialize;
+#[cfg(feature = "local-files")]
+use crate::{
+ dto::{TranscodeSettingsResponse, TranscodeStatsResponse, UpdateTranscodeSettingsRequest},
+ extractors::CurrentUser,
+};
pub fn router() -> Router {
let r = Router::new().route("/stream/{id}", get(stream_file));
#[cfg(feature = "local-files")]
- let r = r.route("/rescan", post(trigger_rescan));
+ let r = r
+ .route("/rescan", post(trigger_rescan))
+ .route("/transcode/{id}/playlist.m3u8", get(transcode_playlist))
+ .route("/transcode/{id}/{segment}", get(transcode_segment))
+ .route(
+ "/transcode-settings",
+ get(get_transcode_settings).put(update_transcode_settings),
+ )
+ .route("/transcode-stats", get(get_transcode_stats))
+ .route("/transcode-cache", delete(clear_transcode_cache));
r
}
-/// Stream a local video file, honouring `Range` headers for seeking.
-///
-/// The path segment is a base64url-encoded relative path produced by the
-/// `LocalFilesProvider`. No authentication required — the ID is not guessable
-/// without knowing the filesystem layout.
+// ============================================================================
+// Direct streaming
+// ============================================================================
+
async fn stream_file(
State(state): State,
Path(encoded_id): Path,
@@ -44,7 +72,6 @@ async fn stream_file(
let rel = infra::local_files::decode_stream_id(&encoded_id)
.ok_or_else(|| ApiError::validation("invalid stream id"))?;
- // Security: canonicalise and verify the path stays inside root.
let full_path = root_dir.join(&rel);
let canonical_root = root_dir
.canonicalize()
@@ -72,7 +99,6 @@ async fn stream_file(
.to_lowercase();
let content_type = content_type_for_ext(&ext);
- // Parse Range header.
let range = headers
.get(axum::http::header::RANGE)
.and_then(|v| v.to_str().ok())
@@ -112,20 +138,208 @@ async fn stream_file(
Err(ApiError::not_implemented("local-files feature not enabled"))
}
-/// Trigger a filesystem rescan and return the number of items found.
+// ============================================================================
+// Rescan
+// ============================================================================
+
#[cfg(feature = "local-files")]
async fn trigger_rescan(
State(state): State,
CurrentUser(_user): CurrentUser,
-) -> Result, ApiError> {
+) -> Result, ApiError> {
let index = state
.local_index
.as_ref()
.ok_or_else(|| ApiError::not_implemented("no local files provider active"))?;
let count = index.rescan().await;
- Ok(axum::Json(serde_json::json!({ "items_found": count })))
+ Ok(Json(serde_json::json!({ "items_found": count })))
}
+// ============================================================================
+// Transcode endpoints
+// ============================================================================
+
+#[cfg(feature = "local-files")]
+async fn transcode_playlist(
+ State(state): State,
+ Path(id): Path,
+) -> Result {
+ let tm = state
+ .transcode_manager
+ .as_ref()
+ .ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
+
+ let root = state.config.local_files_dir.as_ref().ok_or_else(|| {
+ ApiError::not_implemented("LOCAL_FILES_DIR not configured")
+ })?;
+
+ let rel = infra::local_files::decode_stream_id(&id)
+ .ok_or_else(|| ApiError::validation("invalid item id"))?;
+ let src = root.join(&rel);
+
+ tm.ensure_transcoded(&id, &src)
+ .await
+ .map_err(|e| ApiError::internal(e.to_string()))?;
+
+ let playlist_path = tm.transcode_dir.join(&id).join("playlist.m3u8");
+ let content = tokio::fs::read_to_string(&playlist_path)
+ .await
+ .map_err(|e| ApiError::internal(e.to_string()))?;
+
+ Response::builder()
+ .status(200)
+ .header("Content-Type", "application/vnd.apple.mpegurl")
+ .header("Cache-Control", "no-cache")
+ .body(axum::body::Body::from(content))
+ .map_err(|e| ApiError::internal(e.to_string()))
+}
+
+#[derive(Deserialize)]
+#[cfg(feature = "local-files")]
+struct TranscodeSegmentPath {
+ id: String,
+ segment: String,
+}
+
+#[cfg(feature = "local-files")]
+async fn transcode_segment(
+ State(state): State,
+ Path(params): Path,
+) -> Result {
+ let TranscodeSegmentPath { id, segment } = params;
+
+ let ext = std::path::Path::new(&segment)
+ .extension()
+ .and_then(|e| e.to_str())
+ .unwrap_or("");
+ if ext != "ts" && ext != "m3u8" {
+ return Err(ApiError::not_found("invalid segment extension"));
+ }
+ if segment.contains('/') || segment.contains("..") {
+ return Err(ApiError::Forbidden("invalid segment path".into()));
+ }
+
+ let tm = state
+ .transcode_manager
+ .as_ref()
+ .ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
+
+ let file_path = tm.transcode_dir.join(&id).join(&segment);
+ let canonical_base = tm
+ .transcode_dir
+ .canonicalize()
+ .map_err(|e| ApiError::internal(e.to_string()))?;
+ let canonical_file = file_path
+ .canonicalize()
+ .map_err(|_| ApiError::not_found("segment not found"))?;
+ if !canonical_file.starts_with(&canonical_base) {
+ return Err(ApiError::Forbidden("path traversal detected".into()));
+ }
+
+ let content = tokio::fs::read(&canonical_file)
+ .await
+ .map_err(|_| ApiError::not_found("segment not found"))?;
+
+ let content_type = if ext == "ts" {
+ "video/mp2t"
+ } else {
+ "application/vnd.apple.mpegurl"
+ };
+
+ Response::builder()
+ .status(200)
+ .header("Content-Type", content_type)
+ .body(axum::body::Body::from(content))
+ .map_err(|e| ApiError::internal(e.to_string()))
+}
+
+// ============================================================================
+// Transcode settings / stats / cache management
+// ============================================================================
+
+#[cfg(feature = "local-files")]
+async fn get_transcode_settings(
+ State(state): State,
+ CurrentUser(_user): CurrentUser,
+) -> Result, ApiError> {
+ let pool = state
+ .sqlite_pool
+ .as_ref()
+ .ok_or_else(|| ApiError::not_implemented("sqlite not available"))?;
+
+ let (ttl,): (i64,) =
+ sqlx::query_as("SELECT cleanup_ttl_hours FROM transcode_settings WHERE id = 1")
+ .fetch_one(pool)
+ .await
+ .map_err(|e| ApiError::internal(e.to_string()))?;
+
+ Ok(Json(TranscodeSettingsResponse {
+ cleanup_ttl_hours: ttl as u32,
+ }))
+}
+
+#[cfg(feature = "local-files")]
+async fn update_transcode_settings(
+ State(state): State,
+ CurrentUser(_user): CurrentUser,
+ Json(req): Json,
+) -> Result, ApiError> {
+ let pool = state
+ .sqlite_pool
+ .as_ref()
+ .ok_or_else(|| ApiError::not_implemented("sqlite not available"))?;
+
+ let ttl = req.cleanup_ttl_hours as i64;
+ sqlx::query("UPDATE transcode_settings SET cleanup_ttl_hours = ? WHERE id = 1")
+ .bind(ttl)
+ .execute(pool)
+ .await
+ .map_err(|e| ApiError::internal(e.to_string()))?;
+
+ if let Some(tm) = &state.transcode_manager {
+ tm.set_cleanup_ttl(req.cleanup_ttl_hours);
+ }
+
+ Ok(Json(TranscodeSettingsResponse {
+ cleanup_ttl_hours: req.cleanup_ttl_hours,
+ }))
+}
+
+#[cfg(feature = "local-files")]
+async fn get_transcode_stats(
+ State(state): State,
+ CurrentUser(_user): CurrentUser,
+) -> Result, ApiError> {
+ let tm = state
+ .transcode_manager
+ .as_ref()
+ .ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
+ let (cache_size_bytes, item_count) = tm.cache_stats().await;
+ Ok(Json(TranscodeStatsResponse {
+ cache_size_bytes,
+ item_count,
+ }))
+}
+
+#[cfg(feature = "local-files")]
+async fn clear_transcode_cache(
+ State(state): State,
+ CurrentUser(_user): CurrentUser,
+) -> Result {
+ let tm = state
+ .transcode_manager
+ .as_ref()
+ .ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
+ tm.clear_cache()
+ .await
+ .map_err(|e| ApiError::internal(e.to_string()))?;
+ Ok(StatusCode::NO_CONTENT)
+}
+
+// ============================================================================
+// Helpers
+// ============================================================================
+
fn content_type_for_ext(ext: &str) -> &'static str {
match ext {
"mp4" | "m4v" => "video/mp4",
diff --git a/k-tv-backend/api/src/state.rs b/k-tv-backend/api/src/state.rs
index 224c624..8298a62 100644
--- a/k-tv-backend/api/src/state.rs
+++ b/k-tv-backend/api/src/state.rs
@@ -28,6 +28,12 @@ pub struct AppState {
/// Index for the local-files provider, used by the rescan route.
#[cfg(feature = "local-files")]
pub local_index: Option>,
+ /// TranscodeManager for FFmpeg HLS transcoding (requires TRANSCODE_DIR).
+ #[cfg(feature = "local-files")]
+ pub transcode_manager: Option>,
+ /// SQLite pool for transcode settings CRUD.
+ #[cfg(feature = "local-files")]
+ pub sqlite_pool: Option,
}
impl AppState {
@@ -110,6 +116,10 @@ impl AppState {
config: Arc::new(config),
#[cfg(feature = "local-files")]
local_index: None,
+ #[cfg(feature = "local-files")]
+ transcode_manager: None,
+ #[cfg(feature = "local-files")]
+ sqlite_pool: None,
})
}
}
diff --git a/k-tv-backend/domain/src/ports.rs b/k-tv-backend/domain/src/ports.rs
index 9bd9a5f..1a7f568 100644
--- a/k-tv-backend/domain/src/ports.rs
+++ b/k-tv-backend/domain/src/ports.rs
@@ -54,6 +54,8 @@ pub struct ProviderCapabilities {
pub streaming_protocol: StreamingProtocol,
/// Whether `POST /files/rescan` is available.
pub rescan: bool,
+ /// Whether on-demand FFmpeg transcoding to HLS is available.
+ pub transcode: bool,
}
// ============================================================================
diff --git a/k-tv-backend/infra/src/jellyfin/provider.rs b/k-tv-backend/infra/src/jellyfin/provider.rs
index ec20a7f..74d2ede 100644
--- a/k-tv-backend/infra/src/jellyfin/provider.rs
+++ b/k-tv-backend/infra/src/jellyfin/provider.rs
@@ -139,6 +139,7 @@ impl IMediaProvider for JellyfinMediaProvider {
search: true,
streaming_protocol: StreamingProtocol::Hls,
rescan: false,
+ transcode: false,
}
}
diff --git a/k-tv-backend/infra/src/lib.rs b/k-tv-backend/infra/src/lib.rs
index 61aaf2d..c8e1125 100644
--- a/k-tv-backend/infra/src/lib.rs
+++ b/k-tv-backend/infra/src/lib.rs
@@ -40,4 +40,4 @@ pub use schedule_repository::SqliteScheduleRepository;
pub use jellyfin::{JellyfinConfig, JellyfinMediaProvider};
#[cfg(feature = "local-files")]
-pub use local_files::{LocalFilesConfig, LocalFilesProvider, LocalIndex, decode_stream_id};
+pub use local_files::{LocalFilesConfig, LocalFilesProvider, LocalIndex, TranscodeManager, decode_stream_id};
diff --git a/k-tv-backend/infra/src/local_files/config.rs b/k-tv-backend/infra/src/local_files/config.rs
index 0246ba3..f34e746 100644
--- a/k-tv-backend/infra/src/local_files/config.rs
+++ b/k-tv-backend/infra/src/local_files/config.rs
@@ -6,4 +6,8 @@ pub struct LocalFilesConfig {
pub root_dir: PathBuf,
/// Public base URL of this API server, used to build stream URLs.
pub base_url: String,
+ /// Directory for FFmpeg HLS transcode cache. `None` disables transcoding.
+ pub transcode_dir: Option,
+ /// How long (hours) to keep transcode cache entries. Passed to TranscodeManager.
+ pub cleanup_ttl_hours: u32,
}
diff --git a/k-tv-backend/infra/src/local_files/mod.rs b/k-tv-backend/infra/src/local_files/mod.rs
index 1e12d66..0cce7cc 100644
--- a/k-tv-backend/infra/src/local_files/mod.rs
+++ b/k-tv-backend/infra/src/local_files/mod.rs
@@ -2,7 +2,9 @@ pub mod config;
pub mod index;
pub mod provider;
pub mod scanner;
+pub mod transcoder;
pub use config::LocalFilesConfig;
pub use index::LocalIndex;
pub use provider::{LocalFilesProvider, decode_stream_id};
+pub use transcoder::TranscodeManager;
diff --git a/k-tv-backend/infra/src/local_files/provider.rs b/k-tv-backend/infra/src/local_files/provider.rs
index df85f87..89c53aa 100644
--- a/k-tv-backend/infra/src/local_files/provider.rs
+++ b/k-tv-backend/infra/src/local_files/provider.rs
@@ -9,19 +9,26 @@ use domain::{
use super::config::LocalFilesConfig;
use super::index::{LocalIndex, decode_id};
use super::scanner::LocalFileItem;
+use super::transcoder::TranscodeManager;
pub struct LocalFilesProvider {
pub index: Arc,
base_url: String,
+ transcode_manager: Option>,
}
const SHORT_DURATION_SECS: u32 = 1200; // 20 minutes
impl LocalFilesProvider {
- pub fn new(index: Arc, config: LocalFilesConfig) -> Self {
+ pub fn new(
+ index: Arc,
+ config: LocalFilesConfig,
+ transcode_manager: Option>,
+ ) -> Self {
Self {
index,
base_url: config.base_url.trim_end_matches('/').to_string(),
+ transcode_manager,
}
}
}
@@ -57,8 +64,13 @@ impl IMediaProvider for LocalFilesProvider {
tags: true,
decade: true,
search: true,
- streaming_protocol: StreamingProtocol::DirectFile,
+ streaming_protocol: if self.transcode_manager.is_some() {
+ StreamingProtocol::Hls
+ } else {
+ StreamingProtocol::DirectFile
+ },
rescan: true,
+ transcode: self.transcode_manager.is_some(),
}
}
@@ -138,12 +150,27 @@ impl IMediaProvider for LocalFilesProvider {
.map(|item| to_media_item(item_id.clone(), &item)))
}
- async fn get_stream_url(&self, item_id: &MediaItemId, _quality: &StreamQuality) -> DomainResult {
- Ok(format!(
- "{}/api/v1/files/stream/{}",
- self.base_url,
- item_id.as_ref()
- ))
+ async fn get_stream_url(&self, item_id: &MediaItemId, quality: &StreamQuality) -> DomainResult {
+ match quality {
+ StreamQuality::Transcode(_) if self.transcode_manager.is_some() => {
+ let tm = self.transcode_manager.as_ref().unwrap();
+ let rel = decode_id(item_id).ok_or_else(|| {
+ DomainError::InfrastructureError("invalid item id encoding".into())
+ })?;
+ let src = self.index.root_dir.join(&rel);
+ tm.ensure_transcoded(item_id.as_ref(), &src).await?;
+ Ok(format!(
+ "{}/api/v1/files/transcode/{}/playlist.m3u8",
+ self.base_url,
+ item_id.as_ref()
+ ))
+ }
+ _ => Ok(format!(
+ "{}/api/v1/files/stream/{}",
+ self.base_url,
+ item_id.as_ref()
+ )),
+ }
}
async fn list_collections(&self) -> DomainResult> {
diff --git a/k-tv-backend/infra/src/local_files/transcoder.rs b/k-tv-backend/infra/src/local_files/transcoder.rs
new file mode 100644
index 0000000..7286367
--- /dev/null
+++ b/k-tv-backend/infra/src/local_files/transcoder.rs
@@ -0,0 +1,254 @@
+//! FFmpeg HLS transcoder for local video files.
+//!
+//! `TranscodeManager` orchestrates on-demand transcoding: the first request for
+//! an item spawns an ffmpeg process and returns once the initial HLS playlist
+//! appears. Concurrent requests for the same item subscribe to a watch channel
+//! and wait without spawning duplicate processes. Transcoded segments are cached
+//! in `transcode_dir/{item_id}/` and cleaned up by a background task.
+
+use std::collections::HashMap;
+use std::path::{Path, PathBuf};
+use std::sync::{
+ Arc,
+ atomic::{AtomicU32, Ordering},
+};
+use std::time::{Duration, Instant};
+
+use tokio::sync::{Mutex, watch};
+use tracing::{info, warn, error};
+
+use domain::{DomainError, DomainResult};
+
+// ============================================================================
+// Types
+// ============================================================================
+
+#[derive(Clone, Debug)]
+pub enum TranscodeStatus {
+ Ready,
+ Failed(String),
+}
+
+// ============================================================================
+// Manager
+// ============================================================================
+
+pub struct TranscodeManager {
+ pub transcode_dir: PathBuf,
+ cleanup_ttl_hours: Arc,
+ active: Arc>>>>,
+}
+
+impl TranscodeManager {
+ pub fn new(transcode_dir: PathBuf, cleanup_ttl_hours: u32) -> Arc {
+ let mgr = Arc::new(Self {
+ transcode_dir,
+ cleanup_ttl_hours: Arc::new(AtomicU32::new(cleanup_ttl_hours)),
+ active: Arc::new(Mutex::new(HashMap::new())),
+ });
+ // Background cleanup task — uses Weak to avoid keeping manager alive.
+ let weak = Arc::downgrade(&mgr);
+ tokio::spawn(async move {
+ let mut interval = tokio::time::interval(Duration::from_secs(3600));
+ loop {
+ interval.tick().await;
+ match weak.upgrade() {
+ Some(m) => m.run_cleanup().await,
+ None => break,
+ }
+ }
+ });
+ mgr
+ }
+
+ /// Update the cleanup TTL (also persisted to DB by the route handler).
+ pub fn set_cleanup_ttl(&self, hours: u32) {
+ self.cleanup_ttl_hours.store(hours, Ordering::Relaxed);
+ }
+
+ pub fn get_cleanup_ttl(&self) -> u32 {
+ self.cleanup_ttl_hours.load(Ordering::Relaxed)
+ }
+
+ /// Ensure `item_id` has been transcoded to HLS. Blocks until the initial
+ /// playlist appears or an error occurs. Concurrent callers share the result.
+ pub async fn ensure_transcoded(&self, item_id: &str, src_path: &Path) -> DomainResult<()> {
+ let out_dir = self.transcode_dir.join(item_id);
+ let playlist = out_dir.join("playlist.m3u8");
+
+ if playlist.exists() {
+ return Ok(());
+ }
+
+ let mut rx = {
+ let mut map = self.active.lock().await;
+ if let Some(tx) = map.get(item_id) {
+ tx.subscribe()
+ } else {
+ let (tx, rx) = watch::channel::
+ {config?.providers?.some((p) => p.capabilities.transcode) && (
+
+ )}
{capabilities?.rescan && (