//! FFmpeg HLS transcoder for local video files. //! //! `TranscodeManager` orchestrates on-demand transcoding: the first request for //! an item spawns an ffmpeg process and returns once the initial HLS playlist //! appears. Concurrent requests for the same item subscribe to a watch channel //! and wait without spawning duplicate processes. Transcoded segments are cached //! in `transcode_dir/{item_id}/` and cleaned up by a background task. use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::{ Arc, atomic::{AtomicU32, Ordering}, }; use std::time::{Duration, Instant}; use tokio::sync::{Mutex, watch}; use tracing::{info, warn, error}; use domain::{DomainError, DomainResult}; // ============================================================================ // Types // ============================================================================ #[derive(Clone, Debug)] pub enum TranscodeStatus { Ready, Failed(String), } // ============================================================================ // Manager // ============================================================================ pub struct TranscodeManager { pub transcode_dir: PathBuf, cleanup_ttl_hours: Arc, active: Arc>>>>, } impl TranscodeManager { pub fn new(transcode_dir: PathBuf, cleanup_ttl_hours: u32) -> Arc { let mgr = Arc::new(Self { transcode_dir, cleanup_ttl_hours: Arc::new(AtomicU32::new(cleanup_ttl_hours)), active: Arc::new(Mutex::new(HashMap::new())), }); // Background cleanup task — uses Weak to avoid keeping manager alive. let weak = Arc::downgrade(&mgr); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(3600)); loop { interval.tick().await; match weak.upgrade() { Some(m) => m.run_cleanup().await, None => break, } } }); mgr } /// Update the cleanup TTL (also persisted to DB by the route handler). pub fn set_cleanup_ttl(&self, hours: u32) { self.cleanup_ttl_hours.store(hours, Ordering::Relaxed); } pub fn get_cleanup_ttl(&self) -> u32 { self.cleanup_ttl_hours.load(Ordering::Relaxed) } /// Ensure `item_id` has been transcoded to HLS. Blocks until the initial /// playlist appears or an error occurs. Concurrent callers share the result. pub async fn ensure_transcoded(&self, item_id: &str, src_path: &Path) -> DomainResult<()> { let out_dir = self.transcode_dir.join(item_id); let playlist = out_dir.join("playlist.m3u8"); if playlist.exists() { return Ok(()); } let mut rx = { let mut map = self.active.lock().await; if let Some(tx) = map.get(item_id) { tx.subscribe() } else { let (tx, rx) = watch::channel::>(None); map.insert(item_id.to_string(), tx.clone()); let item_id_owned = item_id.to_string(); let src_owned = src_path.to_path_buf(); let out_dir_owned = out_dir.clone(); let playlist_owned = playlist.clone(); let active_ref = Arc::clone(&self.active); tokio::spawn(async move { let _ = tokio::fs::create_dir_all(&out_dir_owned).await; let status = do_transcode(&src_owned, &out_dir_owned, &playlist_owned).await; if matches!(status, TranscodeStatus::Ready) { info!("transcode ready: {}", item_id_owned); } else if let TranscodeStatus::Failed(ref e) = status { error!("transcode failed for {}: {}", item_id_owned, e); } let _ = tx.send(Some(status)); active_ref.lock().await.remove(&item_id_owned); }); rx } }; // Wait for Ready or Failed. loop { rx.changed().await.map_err(|_| { DomainError::InfrastructureError("transcode task dropped unexpectedly".into()) })?; if let Some(status) = &*rx.borrow() { return match status { TranscodeStatus::Ready => Ok(()), TranscodeStatus::Failed(e) => Err(DomainError::InfrastructureError( format!("transcode failed: {}", e), )), }; } } } /// Remove all cached transcode directories. pub async fn clear_cache(&self) -> std::io::Result<()> { if self.transcode_dir.exists() { tokio::fs::remove_dir_all(&self.transcode_dir).await?; } tokio::fs::create_dir_all(&self.transcode_dir).await } /// Return `(total_bytes, item_count)` for the cache directory. pub async fn cache_stats(&self) -> (u64, usize) { let mut total_bytes = 0u64; let mut item_count = 0usize; let Ok(mut entries) = tokio::fs::read_dir(&self.transcode_dir).await else { return (0, 0); }; while let Ok(Some(entry)) = entries.next_entry().await { if !entry.path().is_dir() { continue; } item_count += 1; if let Ok(mut sub) = tokio::fs::read_dir(entry.path()).await { while let Ok(Some(f)) = sub.next_entry().await { if let Ok(meta) = f.metadata().await { total_bytes += meta.len(); } } } } (total_bytes, item_count) } async fn run_cleanup(&self) { let ttl_hours = self.cleanup_ttl_hours.load(Ordering::Relaxed) as u64; let ttl = Duration::from_secs(ttl_hours * 3600); let now = std::time::SystemTime::now(); let Ok(mut entries) = tokio::fs::read_dir(&self.transcode_dir).await else { return; }; while let Ok(Some(entry)) = entries.next_entry().await { let path = entry.path(); if !path.is_dir() { continue; } let playlist = path.join("playlist.m3u8"); if let Ok(meta) = tokio::fs::metadata(&playlist).await { if let Ok(modified) = meta.modified() { if let Ok(age) = now.duration_since(modified) { if age > ttl { warn!("cleanup: removing stale transcode {:?}", path); let _ = tokio::fs::remove_dir_all(&path).await; } } } } } } } // ============================================================================ // FFmpeg helper // ============================================================================ async fn do_transcode(src: &Path, out_dir: &Path, playlist: &Path) -> TranscodeStatus { let segment_pattern = out_dir.join("seg%05d.ts"); let mut child = match tokio::process::Command::new("ffmpeg") .args([ "-i", src.to_str().unwrap_or(""), "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-b:a", "128k", "-hls_time", "6", "-hls_list_size", "0", "-hls_flags", "independent_segments", "-hls_segment_filename", segment_pattern.to_str().unwrap_or(""), playlist.to_str().unwrap_or(""), ]) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::null()) .spawn() { Ok(c) => c, Err(e) => return TranscodeStatus::Failed(format!("ffmpeg spawn error: {}", e)), }; // Poll for playlist.m3u8 — it appears after the first segment is written, // allowing the client to start playback before transcoding is complete. let start = Instant::now(); let timeout = Duration::from_secs(60); loop { if playlist.exists() { return TranscodeStatus::Ready; } if start.elapsed() > timeout { let _ = child.kill().await; return TranscodeStatus::Failed("timeout waiting for transcode to start".into()); } match child.try_wait() { Ok(Some(status)) => { return if playlist.exists() { TranscodeStatus::Ready } else if status.success() { TranscodeStatus::Failed("ffmpeg exited but produced no playlist".into()) } else { TranscodeStatus::Failed("ffmpeg exited with non-zero status".into()) }; } Err(e) => return TranscodeStatus::Failed(e.to_string()), Ok(None) => {} } tokio::time::sleep(Duration::from_millis(100)).await; } }