feat(transcoding): add FFmpeg HLS transcoding support
- Introduced `TranscodeManager` for managing on-demand transcoding of local video files. - Added configuration options for transcoding in `Config` and `LocalFilesConfig`. - Implemented new API routes for managing transcoding settings, stats, and cache. - Updated `LocalFilesProvider` to support transcoding capabilities. - Created frontend components for managing transcode settings and displaying stats. - Added database migration for transcode settings. - Enhanced existing routes and DTOs to accommodate new transcoding features.
This commit is contained in:
254
k-tv-backend/infra/src/local_files/transcoder.rs
Normal file
254
k-tv-backend/infra/src/local_files/transcoder.rs
Normal file
@@ -0,0 +1,254 @@
|
||||
//! FFmpeg HLS transcoder for local video files.
|
||||
//!
|
||||
//! `TranscodeManager` orchestrates on-demand transcoding: the first request for
|
||||
//! an item spawns an ffmpeg process and returns once the initial HLS playlist
|
||||
//! appears. Concurrent requests for the same item subscribe to a watch channel
|
||||
//! and wait without spawning duplicate processes. Transcoded segments are cached
|
||||
//! in `transcode_dir/{item_id}/` and cleaned up by a background task.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU32, Ordering},
|
||||
};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use tokio::sync::{Mutex, watch};
|
||||
use tracing::{info, warn, error};
|
||||
|
||||
use domain::{DomainError, DomainResult};
|
||||
|
||||
// ============================================================================
|
||||
// Types
|
||||
// ============================================================================
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum TranscodeStatus {
|
||||
Ready,
|
||||
Failed(String),
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Manager
|
||||
// ============================================================================
|
||||
|
||||
pub struct TranscodeManager {
|
||||
pub transcode_dir: PathBuf,
|
||||
cleanup_ttl_hours: Arc<AtomicU32>,
|
||||
active: Arc<Mutex<HashMap<String, watch::Sender<Option<TranscodeStatus>>>>>,
|
||||
}
|
||||
|
||||
impl TranscodeManager {
|
||||
pub fn new(transcode_dir: PathBuf, cleanup_ttl_hours: u32) -> Arc<Self> {
|
||||
let mgr = Arc::new(Self {
|
||||
transcode_dir,
|
||||
cleanup_ttl_hours: Arc::new(AtomicU32::new(cleanup_ttl_hours)),
|
||||
active: Arc::new(Mutex::new(HashMap::new())),
|
||||
});
|
||||
// Background cleanup task — uses Weak to avoid keeping manager alive.
|
||||
let weak = Arc::downgrade(&mgr);
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(3600));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
match weak.upgrade() {
|
||||
Some(m) => m.run_cleanup().await,
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
mgr
|
||||
}
|
||||
|
||||
/// Update the cleanup TTL (also persisted to DB by the route handler).
|
||||
pub fn set_cleanup_ttl(&self, hours: u32) {
|
||||
self.cleanup_ttl_hours.store(hours, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn get_cleanup_ttl(&self) -> u32 {
|
||||
self.cleanup_ttl_hours.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Ensure `item_id` has been transcoded to HLS. Blocks until the initial
|
||||
/// playlist appears or an error occurs. Concurrent callers share the result.
|
||||
pub async fn ensure_transcoded(&self, item_id: &str, src_path: &Path) -> DomainResult<()> {
|
||||
let out_dir = self.transcode_dir.join(item_id);
|
||||
let playlist = out_dir.join("playlist.m3u8");
|
||||
|
||||
if playlist.exists() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut rx = {
|
||||
let mut map = self.active.lock().await;
|
||||
if let Some(tx) = map.get(item_id) {
|
||||
tx.subscribe()
|
||||
} else {
|
||||
let (tx, rx) = watch::channel::<Option<TranscodeStatus>>(None);
|
||||
map.insert(item_id.to_string(), tx.clone());
|
||||
|
||||
let item_id_owned = item_id.to_string();
|
||||
let src_owned = src_path.to_path_buf();
|
||||
let out_dir_owned = out_dir.clone();
|
||||
let playlist_owned = playlist.clone();
|
||||
let active_ref = Arc::clone(&self.active);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let _ = tokio::fs::create_dir_all(&out_dir_owned).await;
|
||||
let status = do_transcode(&src_owned, &out_dir_owned, &playlist_owned).await;
|
||||
if matches!(status, TranscodeStatus::Ready) {
|
||||
info!("transcode ready: {}", item_id_owned);
|
||||
} else if let TranscodeStatus::Failed(ref e) = status {
|
||||
error!("transcode failed for {}: {}", item_id_owned, e);
|
||||
}
|
||||
let _ = tx.send(Some(status));
|
||||
active_ref.lock().await.remove(&item_id_owned);
|
||||
});
|
||||
|
||||
rx
|
||||
}
|
||||
};
|
||||
|
||||
// Wait for Ready or Failed.
|
||||
loop {
|
||||
rx.changed().await.map_err(|_| {
|
||||
DomainError::InfrastructureError("transcode task dropped unexpectedly".into())
|
||||
})?;
|
||||
if let Some(status) = &*rx.borrow() {
|
||||
return match status {
|
||||
TranscodeStatus::Ready => Ok(()),
|
||||
TranscodeStatus::Failed(e) => Err(DomainError::InfrastructureError(
|
||||
format!("transcode failed: {}", e),
|
||||
)),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove all cached transcode directories.
|
||||
pub async fn clear_cache(&self) -> std::io::Result<()> {
|
||||
if self.transcode_dir.exists() {
|
||||
tokio::fs::remove_dir_all(&self.transcode_dir).await?;
|
||||
}
|
||||
tokio::fs::create_dir_all(&self.transcode_dir).await
|
||||
}
|
||||
|
||||
/// Return `(total_bytes, item_count)` for the cache directory.
|
||||
pub async fn cache_stats(&self) -> (u64, usize) {
|
||||
let mut total_bytes = 0u64;
|
||||
let mut item_count = 0usize;
|
||||
let Ok(mut entries) = tokio::fs::read_dir(&self.transcode_dir).await else {
|
||||
return (0, 0);
|
||||
};
|
||||
while let Ok(Some(entry)) = entries.next_entry().await {
|
||||
if !entry.path().is_dir() {
|
||||
continue;
|
||||
}
|
||||
item_count += 1;
|
||||
if let Ok(mut sub) = tokio::fs::read_dir(entry.path()).await {
|
||||
while let Ok(Some(f)) = sub.next_entry().await {
|
||||
if let Ok(meta) = f.metadata().await {
|
||||
total_bytes += meta.len();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
(total_bytes, item_count)
|
||||
}
|
||||
|
||||
async fn run_cleanup(&self) {
|
||||
let ttl_hours = self.cleanup_ttl_hours.load(Ordering::Relaxed) as u64;
|
||||
let ttl = Duration::from_secs(ttl_hours * 3600);
|
||||
let now = std::time::SystemTime::now();
|
||||
|
||||
let Ok(mut entries) = tokio::fs::read_dir(&self.transcode_dir).await else {
|
||||
return;
|
||||
};
|
||||
while let Ok(Some(entry)) = entries.next_entry().await {
|
||||
let path = entry.path();
|
||||
if !path.is_dir() {
|
||||
continue;
|
||||
}
|
||||
let playlist = path.join("playlist.m3u8");
|
||||
if let Ok(meta) = tokio::fs::metadata(&playlist).await {
|
||||
if let Ok(modified) = meta.modified() {
|
||||
if let Ok(age) = now.duration_since(modified) {
|
||||
if age > ttl {
|
||||
warn!("cleanup: removing stale transcode {:?}", path);
|
||||
let _ = tokio::fs::remove_dir_all(&path).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// FFmpeg helper
|
||||
// ============================================================================
|
||||
|
||||
async fn do_transcode(src: &Path, out_dir: &Path, playlist: &Path) -> TranscodeStatus {
|
||||
let segment_pattern = out_dir.join("seg%05d.ts");
|
||||
|
||||
let mut child = match tokio::process::Command::new("ffmpeg")
|
||||
.args([
|
||||
"-i",
|
||||
src.to_str().unwrap_or(""),
|
||||
"-c:v",
|
||||
"libx264",
|
||||
"-preset",
|
||||
"fast",
|
||||
"-crf",
|
||||
"23",
|
||||
"-c:a",
|
||||
"aac",
|
||||
"-b:a",
|
||||
"128k",
|
||||
"-hls_time",
|
||||
"6",
|
||||
"-hls_list_size",
|
||||
"0",
|
||||
"-hls_flags",
|
||||
"independent_segments",
|
||||
"-hls_segment_filename",
|
||||
segment_pattern.to_str().unwrap_or(""),
|
||||
playlist.to_str().unwrap_or(""),
|
||||
])
|
||||
.stdout(std::process::Stdio::null())
|
||||
.stderr(std::process::Stdio::null())
|
||||
.spawn()
|
||||
{
|
||||
Ok(c) => c,
|
||||
Err(e) => return TranscodeStatus::Failed(format!("ffmpeg spawn error: {}", e)),
|
||||
};
|
||||
|
||||
// Poll for playlist.m3u8 — it appears after the first segment is written,
|
||||
// allowing the client to start playback before transcoding is complete.
|
||||
let start = Instant::now();
|
||||
let timeout = Duration::from_secs(60);
|
||||
loop {
|
||||
if playlist.exists() {
|
||||
return TranscodeStatus::Ready;
|
||||
}
|
||||
if start.elapsed() > timeout {
|
||||
let _ = child.kill().await;
|
||||
return TranscodeStatus::Failed("timeout waiting for transcode to start".into());
|
||||
}
|
||||
match child.try_wait() {
|
||||
Ok(Some(status)) => {
|
||||
return if playlist.exists() {
|
||||
TranscodeStatus::Ready
|
||||
} else if status.success() {
|
||||
TranscodeStatus::Failed("ffmpeg exited but produced no playlist".into())
|
||||
} else {
|
||||
TranscodeStatus::Failed("ffmpeg exited with non-zero status".into())
|
||||
};
|
||||
}
|
||||
Err(e) => return TranscodeStatus::Failed(e.to_string()),
|
||||
Ok(None) => {}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user