feat: implement local-files feature with various enhancements and cleanup

This commit is contained in:
2026-03-17 03:00:39 +01:00
parent c4d2e48f73
commit d8dd047020
18 changed files with 160 additions and 131 deletions

View File

@@ -287,16 +287,19 @@ pub struct ScheduleResponse {
// Transcode DTOs
// ============================================================================
#[cfg(feature = "local-files")]
#[derive(Debug, Serialize)]
pub struct TranscodeSettingsResponse {
pub cleanup_ttl_hours: u32,
}
#[cfg(feature = "local-files")]
#[derive(Debug, Deserialize)]
pub struct UpdateTranscodeSettingsRequest {
pub cleanup_ttl_hours: u32,
}
#[cfg(feature = "local-files")]
#[derive(Debug, Serialize)]
pub struct TranscodeStatsResponse {
pub cache_size_bytes: u64,

View File

@@ -36,6 +36,7 @@ pub enum ApiError {
#[error("auth_required")]
AuthRequired,
#[allow(dead_code)]
#[error("Not found: {0}")]
NotFound(String),
@@ -165,10 +166,12 @@ impl ApiError {
Self::Validation(msg.into())
}
#[cfg(feature = "local-files")]
pub fn internal(msg: impl Into<String>) -> Self {
Self::Internal(msg.into())
}
#[cfg(feature = "local-files")]
pub fn not_found(msg: impl Into<String>) -> Self {
Self::NotFound(msg.into())
}
@@ -178,5 +181,3 @@ impl ApiError {
}
}
/// Result type alias for API handlers
pub type ApiResult<T> = Result<T, ApiError>;

View File

@@ -67,7 +67,7 @@ impl FromRequestParts<AppState> for OptionalCurrentUser {
let user = validate_jwt_token(&token, state).await.ok();
return Ok(OptionalCurrentUser(user));
}
return Ok(OptionalCurrentUser(None));
Ok(OptionalCurrentUser(None))
}
#[cfg(not(feature = "auth-jwt"))]

View File

@@ -268,7 +268,7 @@ mod tests {
ch
}
fn make_slot(channel_id: Uuid, slot_id: Uuid) -> domain::ScheduledSlot {
fn make_slot(_channel_id: Uuid, slot_id: Uuid) -> domain::ScheduledSlot {
use domain::entities::MediaItem;
let now = Utc::now();
domain::ScheduledSlot {
@@ -347,7 +347,7 @@ mod tests {
assert_eq!(cid, channel_id);
assert_eq!(s.id, slot_id);
}
other => panic!("expected BroadcastTransition, got something else"),
_other => panic!("expected BroadcastTransition, got something else"),
}
}

View File

@@ -47,8 +47,9 @@ pub async fn build_provider_registry(
}
#[cfg(feature = "local-files")]
"local_files" => {
if let Ok(cfg_map) = serde_json::from_str::<std::collections::HashMap<String, String>>(&row.config_json) {
if let Some(files_dir) = cfg_map.get("files_dir") {
if let Ok(cfg_map) = serde_json::from_str::<std::collections::HashMap<String, String>>(&row.config_json)
&& let Some(files_dir) = cfg_map.get("files_dir")
{
let transcode_dir = cfg_map.get("transcode_dir")
.filter(|s| !s.is_empty())
.map(std::path::PathBuf::from);
@@ -72,11 +73,11 @@ pub async fn build_provider_registry(
let tm_clone = Arc::clone(tm);
let repo = build_transcode_settings_repository(db_pool).await.ok();
tokio::spawn(async move {
if let Some(r) = repo {
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
if let Some(r) = repo
&& let Ok(Some(ttl)) = r.load_cleanup_ttl().await
{
tm_clone.set_cleanup_ttl(ttl);
}
}
});
}
registry.register("local", bundle.provider);
@@ -87,7 +88,6 @@ pub async fn build_provider_registry(
}
}
}
}
_ => {}
}
}
@@ -124,11 +124,11 @@ pub async fn build_provider_registry(
let tm_clone = Arc::clone(tm);
let repo = build_transcode_settings_repository(db_pool).await.ok();
tokio::spawn(async move {
if let Some(r) = repo {
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
if let Some(r) = repo
&& let Ok(Some(ttl)) = r.load_cleanup_ttl().await
{
tm_clone.set_cleanup_ttl(ttl);
}
}
});
}
registry.register("local", bundle.provider);

View File

@@ -11,7 +11,7 @@ use axum::Router;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::{get, post, put, delete};
use axum::routing::{get, post, put};
use axum::Json;
use domain::errors::DomainResult;
use domain::ProviderConfigRow;

View File

@@ -8,6 +8,7 @@ pub fn router() -> Router<AppState> {
Router::new().route("/", get(get_config))
}
#[allow(clippy::vec_init_then_push)]
async fn get_config(State(state): State<AppState>) -> Json<ConfigResponse> {
let registry = state.provider_registry.read().await;

View File

@@ -53,6 +53,7 @@ pub fn router() -> Router<AppState> {
// Direct streaming
// ============================================================================
#[cfg_attr(not(feature = "local-files"), allow(unused_variables))]
async fn stream_file(
State(state): State<AppState>,
Path(encoded_id): Path<String>,
@@ -131,7 +132,7 @@ async fn stream_file(
);
}
return builder.body(body).map_err(|e| ApiError::internal(e.to_string()));
builder.body(body).map_err(|e| ApiError::internal(e.to_string()))
}
#[cfg(not(feature = "local-files"))]
@@ -316,6 +317,7 @@ async fn clear_transcode_cache(
// Helpers
// ============================================================================
#[cfg(feature = "local-files")]
fn content_type_for_ext(ext: &str) -> &'static str {
match ext {
"mp4" | "m4v" => "video/mp4",
@@ -327,6 +329,7 @@ fn content_type_for_ext(ext: &str) -> &'static str {
}
}
#[cfg(feature = "local-files")]
fn parse_range(range: &str, file_size: u64) -> Option<(u64, u64)> {
let range = range.strip_prefix("bytes=")?;
let (start_str, end_str) = range.split_once('-')?;

View File

@@ -92,12 +92,12 @@ mod tests {
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use domain::value_objects::{ChannelId, ContentType, UserId};
use domain::{
Channel, ChannelRepository, Collection, DomainResult, GeneratedSchedule, IProviderRegistry,
MediaFilter, MediaItem, MediaItemId, PlaybackRecord, ProviderCapabilities, ScheduleEngineService,
ScheduleRepository, SeriesSummary, StreamQuality, StreamingProtocol,
MediaFilter, MediaItem, MediaItemId, PlaybackRecord, ProviderCapabilities,
ScheduleEngineService, ScheduleRepository, SeriesSummary, StreamQuality,
};
use domain::value_objects::{ChannelId, ContentType, UserId};
use uuid::Uuid;
// ── Mocks ─────────────────────────────────────────────────────────────────
@@ -142,14 +142,20 @@ mod tests {
) -> DomainResult<Option<GeneratedSchedule>> {
Ok(None)
}
async fn find_latest(&self, _channel_id: ChannelId) -> DomainResult<Option<GeneratedSchedule>> {
async fn find_latest(
&self,
_channel_id: ChannelId,
) -> DomainResult<Option<GeneratedSchedule>> {
Ok(self.latest.clone())
}
async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> {
self.saved.lock().unwrap().push(schedule.clone());
Ok(())
}
async fn find_playback_history(&self, _channel_id: ChannelId) -> DomainResult<Vec<PlaybackRecord>> {
async fn find_playback_history(
&self,
_channel_id: ChannelId,
) -> DomainResult<Vec<PlaybackRecord>> {
Ok(vec![])
}
async fn save_playback_record(&self, _record: &PlaybackRecord) -> DomainResult<()> {
@@ -161,13 +167,21 @@ mod tests {
#[async_trait]
impl IProviderRegistry for MockRegistry {
async fn fetch_items(&self, _provider_id: &str, _filter: &MediaFilter) -> DomainResult<Vec<MediaItem>> {
async fn fetch_items(
&self,
_provider_id: &str,
_filter: &MediaFilter,
) -> DomainResult<Vec<MediaItem>> {
Ok(vec![])
}
async fn fetch_by_id(&self, _item_id: &MediaItemId) -> DomainResult<Option<MediaItem>> {
Ok(None)
}
async fn get_stream_url(&self, _item_id: &MediaItemId, _quality: &StreamQuality) -> DomainResult<String> {
async fn get_stream_url(
&self,
_item_id: &MediaItemId,
_quality: &StreamQuality,
) -> DomainResult<String> {
unimplemented!()
}
fn provider_ids(&self) -> Vec<String> {
@@ -182,10 +196,18 @@ mod tests {
async fn list_collections(&self, _provider_id: &str) -> DomainResult<Vec<Collection>> {
unimplemented!()
}
async fn list_series(&self, _provider_id: &str, _collection_id: Option<&str>) -> DomainResult<Vec<SeriesSummary>> {
async fn list_series(
&self,
_provider_id: &str,
_collection_id: Option<&str>,
) -> DomainResult<Vec<SeriesSummary>> {
unimplemented!()
}
async fn list_genres(&self, _provider_id: &str, _content_type: Option<&ContentType>) -> DomainResult<Vec<String>> {
async fn list_genres(
&self,
_provider_id: &str,
_content_type: Option<&ContentType>,
) -> DomainResult<Vec<String>> {
unimplemented!()
}
}
@@ -226,9 +248,12 @@ mod tests {
async fn test_no_schedule_generates_from_now() {
let ch = make_channel();
let saved = Arc::new(Mutex::new(vec![]));
let channel_repo: Arc<dyn ChannelRepository> = Arc::new(MockChannelRepo { channels: vec![ch] });
let schedule_repo: Arc<dyn ScheduleRepository> =
Arc::new(MockScheduleRepo { latest: None, saved: saved.clone() });
let channel_repo: Arc<dyn ChannelRepository> =
Arc::new(MockChannelRepo { channels: vec![ch] });
let schedule_repo: Arc<dyn ScheduleRepository> = Arc::new(MockScheduleRepo {
latest: None,
saved: saved.clone(),
});
let engine = make_engine(channel_repo.clone(), schedule_repo);
let (event_tx, _) = tokio::sync::broadcast::channel(8);
@@ -246,9 +271,12 @@ mod tests {
let valid_until = Utc::now() + Duration::hours(25);
let schedule = make_schedule(ch.id, valid_until);
let saved = Arc::new(Mutex::new(vec![]));
let channel_repo: Arc<dyn ChannelRepository> = Arc::new(MockChannelRepo { channels: vec![ch] });
let schedule_repo: Arc<dyn ScheduleRepository> =
Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() });
let channel_repo: Arc<dyn ChannelRepository> =
Arc::new(MockChannelRepo { channels: vec![ch] });
let schedule_repo: Arc<dyn ScheduleRepository> = Arc::new(MockScheduleRepo {
latest: Some(schedule),
saved: saved.clone(),
});
let engine = make_engine(channel_repo.clone(), schedule_repo);
let (event_tx, _) = tokio::sync::broadcast::channel(8);
@@ -263,9 +291,12 @@ mod tests {
let valid_until = Utc::now() + Duration::hours(20);
let schedule = make_schedule(ch.id, valid_until);
let saved = Arc::new(Mutex::new(vec![]));
let channel_repo: Arc<dyn ChannelRepository> = Arc::new(MockChannelRepo { channels: vec![ch] });
let schedule_repo: Arc<dyn ScheduleRepository> =
Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() });
let channel_repo: Arc<dyn ChannelRepository> =
Arc::new(MockChannelRepo { channels: vec![ch] });
let schedule_repo: Arc<dyn ScheduleRepository> = Arc::new(MockScheduleRepo {
latest: Some(schedule),
saved: saved.clone(),
});
let engine = make_engine(channel_repo.clone(), schedule_repo);
let (event_tx, _) = tokio::sync::broadcast::channel(8);
@@ -282,9 +313,12 @@ mod tests {
let valid_until = Utc::now() - Duration::hours(1);
let schedule = make_schedule(ch.id, valid_until);
let saved = Arc::new(Mutex::new(vec![]));
let channel_repo: Arc<dyn ChannelRepository> = Arc::new(MockChannelRepo { channels: vec![ch] });
let schedule_repo: Arc<dyn ScheduleRepository> =
Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() });
let channel_repo: Arc<dyn ChannelRepository> =
Arc::new(MockChannelRepo { channels: vec![ch] });
let schedule_repo: Arc<dyn ScheduleRepository> = Arc::new(MockScheduleRepo {
latest: Some(schedule),
saved: saved.clone(),
});
let engine = make_engine(channel_repo.clone(), schedule_repo);
let (event_tx, _) = tokio::sync::broadcast::channel(8);

View File

@@ -54,6 +54,7 @@ pub struct AppState {
}
impl AppState {
#[allow(clippy::too_many_arguments)]
pub async fn new(
user_service: UserService,
channel_service: ChannelService,

View File

@@ -182,8 +182,9 @@ async fn post_webhook(
let mut req = client.post(url).body(body);
let mut has_content_type = false;
if let Some(h) = headers_json {
if let Ok(map) = serde_json::from_str::<serde_json::Map<String, Value>>(h) {
if let Some(h) = headers_json
&& let Ok(map) = serde_json::from_str::<serde_json::Map<String, Value>>(h)
{
for (k, v) in &map {
if k.to_lowercase() == "content-type" {
has_content_type = true;
@@ -193,7 +194,6 @@ async fn post_webhook(
}
}
}
}
if !has_content_type {
req = req.header("Content-Type", "application/json");

View File

@@ -229,11 +229,11 @@ impl ScheduleEngineService {
}
/// Return all slots that overlap the given time window — the EPG data.
pub fn get_epg<'a>(
schedule: &'a GeneratedSchedule,
pub fn get_epg(
schedule: &GeneratedSchedule,
from: DateTime<Utc>,
until: DateTime<Utc>,
) -> Vec<&'a ScheduledSlot> {
) -> Vec<&ScheduledSlot> {
schedule
.slots
.iter()
@@ -245,6 +245,7 @@ impl ScheduleEngineService {
// Block resolution
// -------------------------------------------------------------------------
#[allow(clippy::too_many_arguments)]
async fn resolve_block(
&self,
block: &ProgrammingBlock,
@@ -310,6 +311,7 @@ impl ScheduleEngineService {
///
/// `last_item_id` is the ID of the last item scheduled in this block in the
/// previous generation. Used only by `Sequential` for series continuity.
#[allow(clippy::too_many_arguments)]
async fn resolve_algorithmic(
&self,
provider_id: &str,

View File

@@ -179,11 +179,7 @@ impl JwtValidator {
/// Get the user ID (subject) from a token without full validation
/// Useful for logging/debugging, but should not be trusted for auth
pub fn decode_unverified(&self, token: &str) -> Result<JwtClaims, JwtError> {
let mut validation = Validation::new(Algorithm::HS256);
validation.insecure_disable_signature_validation();
validation.validate_exp = false;
let token_data = decode::<JwtClaims>(token, &self.decoding_key, &validation)
let token_data = jsonwebtoken::dangerous::insecure_decode::<JwtClaims>(token)
.map_err(|_| JwtError::InvalidFormat)?;
Ok(token_data.claims)

View File

@@ -376,14 +376,13 @@ impl IMediaProvider for JellyfinMediaProvider {
if resp.status().is_success() {
let info: JellyfinPlaybackInfoResponse = resp.json().await
.map_err(|e| DomainError::InfrastructureError(format!("PlaybackInfo parse failed: {e}")))?;
if let Some(src) = info.media_sources.first() {
if src.supports_direct_stream {
if let Some(rel_url) = &src.direct_stream_url {
if let Some(src) = info.media_sources.first()
&& src.supports_direct_stream
&& let Some(rel_url) = &src.direct_stream_url
{
return Ok(format!("{}{}&api_key={}", self.config.base_url, rel_url, self.config.api_key));
}
}
}
}
// Fallback: HLS at 8 Mbps
Ok(self.hls_url(item_id, 8_000_000))
}

View File

@@ -86,11 +86,9 @@ impl IMediaProvider for LocalFilesProvider {
} else {
ContentType::Movie
};
if let Some(ref ct) = filter.content_type {
if &content_type != ct {
if let Some(ref ct) = filter.content_type && &content_type != ct {
return None;
}
}
// collections: match against top_dir
if !filter.collections.is_empty() && !filter.collections.contains(&item.top_dir) {
@@ -117,23 +115,17 @@ impl IMediaProvider for LocalFilesProvider {
}
// duration bounds
if let Some(min) = filter.min_duration_secs {
if item.duration_secs < min {
if let Some(min) = filter.min_duration_secs && item.duration_secs < min {
return None;
}
}
if let Some(max) = filter.max_duration_secs {
if item.duration_secs > max {
if let Some(max) = filter.max_duration_secs && item.duration_secs > max {
return None;
}
}
// search_term: case-insensitive substring in title
if let Some(ref q) = filter.search_term {
if !item.title.to_lowercase().contains(&q.to_lowercase()) {
if let Some(ref q) = filter.search_term && !item.title.to_lowercase().contains(&q.to_lowercase()) {
return None;
}
}
Some(to_media_item(id, &item))
})

View File

@@ -171,19 +171,17 @@ impl TranscodeManager {
continue;
}
let playlist = path.join("playlist.m3u8");
if let Ok(meta) = tokio::fs::metadata(&playlist).await {
if let Ok(modified) = meta.modified() {
if let Ok(age) = now.duration_since(modified) {
if age > ttl {
if let Ok(meta) = tokio::fs::metadata(&playlist).await
&& let Ok(modified) = meta.modified()
&& let Ok(age) = now.duration_since(modified)
&& age > ttl
{
warn!("cleanup: removing stale transcode {:?}", path);
let _ = tokio::fs::remove_dir_all(&path).await;
}
}
}
}
}
}
}
// ============================================================================
// FFmpeg helper

View File

@@ -9,5 +9,5 @@ pub fn json_err(e: serde_json::Error) -> String {
}
pub fn ok_json<T: serde::Serialize>(value: &T) -> String {
serde_json::to_string(value).unwrap_or_else(|e| json_err(e))
serde_json::to_string(value).unwrap_or_else(json_err)
}

View File

@@ -87,7 +87,7 @@ async fn main() -> anyhow::Result<()> {
#[cfg(feature = "local-files")]
if let Some(dir) = std::env::var("LOCAL_FILES_DIR").ok().map(std::path::PathBuf::from) {
if let k_core::db::DatabasePool::Sqlite(ref sqlite_pool) = db_pool {
let k_core::db::DatabasePool::Sqlite(ref sqlite_pool) = db_pool;
let base_url = std::env::var("BASE_URL")
.unwrap_or_else(|_| "http://localhost:3000".to_string());
let lf_cfg = infra::LocalFilesConfig {
@@ -101,7 +101,6 @@ async fn main() -> anyhow::Result<()> {
tokio::spawn(async move { scan_idx.rescan().await; });
registry.register("local", Arc::new(infra::LocalFilesProvider::new(idx, lf_cfg, None)));
}
}
if registry.is_empty() {
tracing::warn!("No media provider configured. Set JELLYFIN_BASE_URL or LOCAL_FILES_DIR.");