diff --git a/k-tv-backend/api/src/dto.rs b/k-tv-backend/api/src/dto.rs index f468fce..30aa8ac 100644 --- a/k-tv-backend/api/src/dto.rs +++ b/k-tv-backend/api/src/dto.rs @@ -42,10 +42,20 @@ pub struct TokenResponse { pub expires_in: u64, } +/// Per-provider info returned by `GET /config`. +#[derive(Debug, Serialize)] +pub struct ProviderInfo { + pub id: String, + pub capabilities: domain::ProviderCapabilities, +} + /// System configuration response #[derive(Debug, Serialize)] pub struct ConfigResponse { pub allow_registration: bool, + /// All registered providers with their capabilities. + pub providers: Vec, + /// Capabilities of the primary provider — kept for backward compatibility. pub provider_capabilities: domain::ProviderCapabilities, } diff --git a/k-tv-backend/api/src/main.rs b/k-tv-backend/api/src/main.rs index f0cf5bb..57556ab 100644 --- a/k-tv-backend/api/src/main.rs +++ b/k-tv-backend/api/src/main.rs @@ -10,7 +10,7 @@ use axum::http::{HeaderName, HeaderValue}; use std::sync::Arc; use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer}; -use domain::{ChannelService, IMediaProvider, ProviderCapabilities, ScheduleEngineService, StreamingProtocol, UserService}; +use domain::{ChannelService, IMediaProvider, IProviderRegistry, ProviderCapabilities, ScheduleEngineService, StreamingProtocol, UserService}; use infra::factory::{build_channel_repository, build_schedule_repository, build_user_repository}; use infra::run_migrations; use k_core::http::server::{ServerConfig, apply_standard_middleware}; @@ -72,11 +72,11 @@ async fn main() -> anyhow::Result<()> { let user_service = UserService::new(user_repo); let channel_service = ChannelService::new(channel_repo.clone()); - // Build media provider — Jellyfin → local-files → noop, first match wins. + // Build provider registry — all configured providers are registered simultaneously. #[cfg(feature = "local-files")] let mut local_index: Option> = None; - let mut maybe_provider: Option> = None; + let mut registry = infra::ProviderRegistry::new(); #[cfg(feature = "jellyfin")] if let (Some(base_url), Some(api_key), Some(user_id)) = ( @@ -85,7 +85,7 @@ async fn main() -> anyhow::Result<()> { &config.jellyfin_user_id, ) { tracing::info!("Media provider: Jellyfin at {}", base_url); - maybe_provider = Some(Arc::new(infra::JellyfinMediaProvider::new(infra::JellyfinConfig { + registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(infra::JellyfinConfig { base_url: base_url.clone(), api_key: api_key.clone(), user_id: user_id.clone(), @@ -93,35 +93,33 @@ async fn main() -> anyhow::Result<()> { } #[cfg(feature = "local-files")] - if maybe_provider.is_none() { - if let Some(dir) = &config.local_files_dir { - if let k_core::db::DatabasePool::Sqlite(ref sqlite_pool) = db_pool { - tracing::info!("Media provider: local files at {:?}", dir); - let lf_cfg = infra::LocalFilesConfig { - root_dir: dir.clone(), - base_url: config.base_url.clone(), - }; - let idx = Arc::new(infra::LocalIndex::new(&lf_cfg, sqlite_pool.clone()).await); - local_index = Some(Arc::clone(&idx)); - let scan_idx = Arc::clone(&idx); - tokio::spawn(async move { scan_idx.rescan().await; }); - maybe_provider = Some(Arc::new(infra::LocalFilesProvider::new(idx, lf_cfg))); - } else { - tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR"); - } + if let Some(dir) = &config.local_files_dir { + if let k_core::db::DatabasePool::Sqlite(ref sqlite_pool) = db_pool { + tracing::info!("Media provider: local files at {:?}", dir); + let lf_cfg = infra::LocalFilesConfig { + root_dir: dir.clone(), + base_url: config.base_url.clone(), + }; + let idx = Arc::new(infra::LocalIndex::new(&lf_cfg, sqlite_pool.clone()).await); + local_index = Some(Arc::clone(&idx)); + let scan_idx = Arc::clone(&idx); + tokio::spawn(async move { scan_idx.rescan().await; }); + registry.register("local", Arc::new(infra::LocalFilesProvider::new(idx, lf_cfg))); + } else { + tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR"); } } - let media_provider: Arc = maybe_provider.unwrap_or_else(|| { - tracing::warn!( - "No media provider configured. Set JELLYFIN_BASE_URL / LOCAL_FILES_DIR." - ); - Arc::new(NoopMediaProvider) - }); + if registry.is_empty() { + tracing::warn!("No media provider configured. Set JELLYFIN_BASE_URL / LOCAL_FILES_DIR."); + registry.register("noop", Arc::new(NoopMediaProvider)); + } + + let registry = Arc::new(registry); let bg_channel_repo = channel_repo.clone(); let schedule_engine = ScheduleEngineService::new( - Arc::clone(&media_provider), + Arc::clone(®istry) as Arc, channel_repo, schedule_repo, ); @@ -131,7 +129,7 @@ async fn main() -> anyhow::Result<()> { user_service, channel_service, schedule_engine, - media_provider, + registry, config.clone(), ) .await?; diff --git a/k-tv-backend/api/src/routes/config.rs b/k-tv-backend/api/src/routes/config.rs index 0adc712..0bed44c 100644 --- a/k-tv-backend/api/src/routes/config.rs +++ b/k-tv-backend/api/src/routes/config.rs @@ -1,6 +1,7 @@ use axum::{Json, Router, extract::State, routing::get}; +use domain::{IProviderRegistry as _, ProviderCapabilities, StreamingProtocol}; -use crate::dto::ConfigResponse; +use crate::dto::{ConfigResponse, ProviderInfo}; use crate::state::AppState; pub fn router() -> Router { @@ -8,8 +9,35 @@ pub fn router() -> Router { } async fn get_config(State(state): State) -> Json { + let providers: Vec = state + .provider_registry + .provider_ids() + .into_iter() + .filter_map(|id| { + state.provider_registry.capabilities(&id).map(|caps| ProviderInfo { + id: id.clone(), + capabilities: caps, + }) + }) + .collect(); + + let primary_capabilities = state + .provider_registry + .capabilities(state.provider_registry.primary_id()) + .unwrap_or(ProviderCapabilities { + collections: false, + series: false, + genres: false, + tags: false, + decade: false, + search: false, + streaming_protocol: StreamingProtocol::DirectFile, + rescan: false, + }); + Json(ConfigResponse { allow_registration: state.config.allow_registration, - provider_capabilities: state.media_provider.capabilities(), + providers, + provider_capabilities: primary_capabilities, }) } diff --git a/k-tv-backend/api/src/routes/library.rs b/k-tv-backend/api/src/routes/library.rs index 84fcaad..cc3c460 100644 --- a/k-tv-backend/api/src/routes/library.rs +++ b/k-tv-backend/api/src/routes/library.rs @@ -14,6 +14,7 @@ use axum::{ extract::{Query, RawQuery, State}, routing::get, }; +use domain::IProviderRegistry as _; use serde::{Deserialize, Serialize}; use domain::{Collection, ContentType, MediaFilter, SeriesSummary}; @@ -93,10 +94,18 @@ struct LibraryItemResponse { // Query params // ============================================================================ +#[derive(Debug, Deserialize)] +struct CollectionsQuery { + /// Provider key to query (default: primary). + provider: Option, +} + #[derive(Debug, Deserialize)] struct SeriesQuery { /// Scope results to a specific collection (provider library ID). collection: Option, + /// Provider key to query (default: primary). + provider: Option, } #[derive(Debug, Deserialize)] @@ -104,6 +113,8 @@ struct GenresQuery { /// Limit genres to a content type: "movie", "episode", or "short". #[serde(rename = "type")] content_type: Option, + /// Provider key to query (default: primary). + provider: Option, } #[derive(Debug, Default, Deserialize)] @@ -125,6 +136,8 @@ struct ItemsQuery { /// Applies the same ordering the schedule engine would use so the preview /// reflects what will actually be scheduled. strategy: Option, + /// Provider key to query (default: primary). + provider: Option, } // ============================================================================ @@ -135,13 +148,16 @@ struct ItemsQuery { async fn list_collections( State(state): State, CurrentUser(_user): CurrentUser, + Query(params): Query, ) -> Result>, ApiError> { - if !state.media_provider.capabilities().collections { - return Err(ApiError::not_implemented( - "collections not supported by this provider", - )); + let provider_id = params.provider.as_deref().unwrap_or(""); + let caps = state.provider_registry.capabilities(provider_id).ok_or_else(|| { + ApiError::validation(format!("Unknown provider '{}'", provider_id)) + })?; + if !caps.collections { + return Err(ApiError::not_implemented("collections not supported by this provider")); } - let collections = state.media_provider.list_collections().await?; + let collections = state.provider_registry.list_collections(provider_id).await?; Ok(Json(collections.into_iter().map(Into::into).collect())) } @@ -151,14 +167,16 @@ async fn list_series( CurrentUser(_user): CurrentUser, Query(params): Query, ) -> Result>, ApiError> { - if !state.media_provider.capabilities().series { - return Err(ApiError::not_implemented( - "series not supported by this provider", - )); + let provider_id = params.provider.as_deref().unwrap_or(""); + let caps = state.provider_registry.capabilities(provider_id).ok_or_else(|| { + ApiError::validation(format!("Unknown provider '{}'", provider_id)) + })?; + if !caps.series { + return Err(ApiError::not_implemented("series not supported by this provider")); } let series = state - .media_provider - .list_series(params.collection.as_deref()) + .provider_registry + .list_series(provider_id, params.collection.as_deref()) .await?; Ok(Json(series.into_iter().map(Into::into).collect())) } @@ -169,13 +187,15 @@ async fn list_genres( CurrentUser(_user): CurrentUser, Query(params): Query, ) -> Result>, ApiError> { - if !state.media_provider.capabilities().genres { - return Err(ApiError::not_implemented( - "genres not supported by this provider", - )); + let provider_id = params.provider.as_deref().unwrap_or(""); + let caps = state.provider_registry.capabilities(provider_id).ok_or_else(|| { + ApiError::validation(format!("Unknown provider '{}'", provider_id)) + })?; + if !caps.genres { + return Err(ApiError::not_implemented("genres not supported by this provider")); } let ct = parse_content_type(params.content_type.as_deref())?; - let genres = state.media_provider.list_genres(ct.as_ref()).await?; + let genres = state.provider_registry.list_genres(provider_id, ct.as_ref()).await?; Ok(Json(genres)) } @@ -195,6 +215,8 @@ async fn search_items( .unwrap_or_default(); let limit = params.limit.unwrap_or(50).min(200); + let provider_id = params.provider.as_deref().unwrap_or(""); + let filter = MediaFilter { content_type: parse_content_type(params.content_type.as_deref())?, search_term: params.q, @@ -206,7 +228,7 @@ async fn search_items( ..Default::default() }; - let mut items = state.media_provider.fetch_items(&filter).await?; + let mut items = state.provider_registry.fetch_items(provider_id, &filter).await?; // Apply the same ordering the schedule engine uses so the preview reflects // what will actually be scheduled rather than raw provider order. diff --git a/k-tv-backend/api/src/state.rs b/k-tv-backend/api/src/state.rs index 0e70185..224c624 100644 --- a/k-tv-backend/api/src/state.rs +++ b/k-tv-backend/api/src/state.rs @@ -11,14 +11,14 @@ use infra::auth::oidc::OidcService; use std::sync::Arc; use crate::config::Config; -use domain::{ChannelService, IMediaProvider, ScheduleEngineService, UserService}; +use domain::{ChannelService, ScheduleEngineService, UserService}; #[derive(Clone)] pub struct AppState { pub user_service: Arc, pub channel_service: Arc, pub schedule_engine: Arc, - pub media_provider: Arc, + pub provider_registry: Arc, pub cookie_key: Key, #[cfg(feature = "auth-oidc")] pub oidc_service: Option>, @@ -35,7 +35,7 @@ impl AppState { user_service: UserService, channel_service: ChannelService, schedule_engine: ScheduleEngineService, - media_provider: Arc, + provider_registry: Arc, config: Config, ) -> anyhow::Result { let cookie_key = Key::derive_from(config.cookie_secret.as_bytes()); @@ -101,7 +101,7 @@ impl AppState { user_service: Arc::new(user_service), channel_service: Arc::new(channel_service), schedule_engine: Arc::new(schedule_engine), - media_provider, + provider_registry, cookie_key, #[cfg(feature = "auth-oidc")] oidc_service, diff --git a/k-tv-backend/domain/src/entities.rs b/k-tv-backend/domain/src/entities.rs index 70922aa..b7998a4 100644 --- a/k-tv-backend/domain/src/entities.rs +++ b/k-tv-backend/domain/src/entities.rs @@ -214,7 +214,7 @@ impl ProgrammingBlock { name: name.into(), start_time, duration_mins, - content: BlockContent::Algorithmic { filter, strategy }, + content: BlockContent::Algorithmic { filter, strategy, provider_id: String::new() }, loop_on_finish: true, ignore_recycle_policy: false, access_mode: AccessMode::default(), @@ -233,7 +233,7 @@ impl ProgrammingBlock { name: name.into(), start_time, duration_mins, - content: BlockContent::Manual { items }, + content: BlockContent::Manual { items, provider_id: String::new() }, loop_on_finish: true, ignore_recycle_policy: false, access_mode: AccessMode::default(), @@ -247,11 +247,21 @@ impl ProgrammingBlock { #[serde(tag = "type", rename_all = "snake_case")] pub enum BlockContent { /// The user hand-picked specific items in a specific order. - Manual { items: Vec }, + /// Item IDs are prefixed with the provider key (e.g. `"jellyfin::abc123"`) + /// so the registry can route each fetch to the correct provider. + Manual { + items: Vec, + /// Registry key of the provider these items come from. Empty string = primary. + #[serde(default)] + provider_id: String, + }, /// The engine selects items from the provider using the given filter and strategy. Algorithmic { filter: MediaFilter, strategy: FillStrategy, + /// Registry key of the provider to query. Empty string = primary. + #[serde(default)] + provider_id: String, }, } diff --git a/k-tv-backend/domain/src/lib.rs b/k-tv-backend/domain/src/lib.rs index 4ef7e41..e8b7f87 100644 --- a/k-tv-backend/domain/src/lib.rs +++ b/k-tv-backend/domain/src/lib.rs @@ -14,7 +14,7 @@ pub mod value_objects; // Re-export commonly used types pub use entities::*; pub use errors::{DomainError, DomainResult}; -pub use ports::{Collection, IMediaProvider, ProviderCapabilities, SeriesSummary, StreamingProtocol, StreamQuality}; +pub use ports::{Collection, IMediaProvider, IProviderRegistry, ProviderCapabilities, SeriesSummary, StreamingProtocol, StreamQuality}; pub use repositories::*; pub use iptv::{generate_m3u, generate_xmltv}; pub use services::{ChannelService, ScheduleEngineService, UserService}; diff --git a/k-tv-backend/domain/src/ports.rs b/k-tv-backend/domain/src/ports.rs index 7ac6e09..9bd9a5f 100644 --- a/k-tv-backend/domain/src/ports.rs +++ b/k-tv-backend/domain/src/ports.rs @@ -162,3 +162,47 @@ pub trait IMediaProvider: Send + Sync { )) } } + +// ============================================================================ +// Registry port +// ============================================================================ + +/// Port for routing media operations across multiple named providers. +/// +/// The registry holds all configured providers (Jellyfin, local files, …) and +/// dispatches each call to the right one. Item IDs are prefixed with the +/// provider key (e.g. `"jellyfin::abc123"`, `"local::base64path"`) so every +/// fetch and stream call is self-routing. An empty prefix falls back to the +/// primary (first-registered) provider for backward compatibility. +#[async_trait] +pub trait IProviderRegistry: Send + Sync { + /// Fetch items from a named provider (used by Algorithmic blocks). + /// Empty `provider_id` uses the primary provider. + /// Returned item IDs are stamped with the provider prefix. + async fn fetch_items(&self, provider_id: &str, filter: &MediaFilter) -> DomainResult>; + + /// Fetch a single item by its (possibly prefixed) ID. + /// Routes to the correct provider by parsing the prefix. + async fn fetch_by_id(&self, item_id: &MediaItemId) -> DomainResult>; + + /// Get a playback URL. Routes via prefix in `item_id`. + async fn get_stream_url(&self, item_id: &MediaItemId, quality: &StreamQuality) -> DomainResult; + + /// List all registered provider keys in registration order. + fn provider_ids(&self) -> Vec; + + /// Key of the primary (first-registered) provider. + fn primary_id(&self) -> &str; + + /// Capability matrix for a specific provider. Returns `None` if the key is unknown. + fn capabilities(&self, provider_id: &str) -> Option; + + /// List collections for a provider. Empty `provider_id` = primary. + async fn list_collections(&self, provider_id: &str) -> DomainResult>; + + /// List series for a provider. Empty `provider_id` = primary. + async fn list_series(&self, provider_id: &str, collection_id: Option<&str>) -> DomainResult>; + + /// List genres for a provider. Empty `provider_id` = primary. + async fn list_genres(&self, provider_id: &str, content_type: Option<&ContentType>) -> DomainResult>; +} diff --git a/k-tv-backend/domain/src/services/schedule/mod.rs b/k-tv-backend/domain/src/services/schedule/mod.rs index 7ad9936..0457d1f 100644 --- a/k-tv-backend/domain/src/services/schedule/mod.rs +++ b/k-tv-backend/domain/src/services/schedule/mod.rs @@ -10,7 +10,7 @@ use crate::entities::{ ScheduledSlot, }; use crate::errors::{DomainError, DomainResult}; -use crate::ports::{IMediaProvider, StreamQuality}; +use crate::ports::{IProviderRegistry, StreamQuality}; use crate::repositories::{ChannelRepository, ScheduleRepository}; use crate::value_objects::{ BlockId, ChannelId, FillStrategy, MediaFilter, MediaItemId, RecyclePolicy, @@ -26,19 +26,19 @@ mod recycle; /// `ScheduledSlot`s via the `IMediaProvider`, and applying the `RecyclePolicy` /// to avoid replaying recently aired items. pub struct ScheduleEngineService { - media_provider: Arc, + provider_registry: Arc, channel_repo: Arc, schedule_repo: Arc, } impl ScheduleEngineService { pub fn new( - media_provider: Arc, + provider_registry: Arc, channel_repo: Arc, schedule_repo: Arc, ) -> Self { Self { - media_provider, + provider_registry, channel_repo, schedule_repo, } @@ -223,9 +223,9 @@ impl ScheduleEngineService { self.schedule_repo.find_active(channel_id, at).await } - /// Delegate stream URL resolution to the configured media provider. + /// Delegate stream URL resolution to the provider registry (routes via ID prefix). pub async fn get_stream_url(&self, item_id: &MediaItemId, quality: &StreamQuality) -> DomainResult { - self.media_provider.get_stream_url(item_id, quality).await + self.provider_registry.get_stream_url(item_id, quality).await } /// Return all slots that overlap the given time window — the EPG data. @@ -256,12 +256,12 @@ impl ScheduleEngineService { last_item_id: Option<&MediaItemId>, ) -> DomainResult> { match &block.content { - BlockContent::Manual { items } => { + BlockContent::Manual { items, .. } => { self.resolve_manual(items, start, end, block.id).await } - BlockContent::Algorithmic { filter, strategy } => { + BlockContent::Algorithmic { filter, strategy, provider_id } => { self.resolve_algorithmic( - filter, strategy, start, end, history, policy, generation, + provider_id, filter, strategy, start, end, history, policy, generation, block.id, last_item_id, block.loop_on_finish, block.ignore_recycle_policy, @@ -287,7 +287,7 @@ impl ScheduleEngineService { if cursor >= end { break; } - if let Some(item) = self.media_provider.fetch_by_id(item_id).await? { + if let Some(item) = self.provider_registry.fetch_by_id(item_id).await? { let item_end = (cursor + Duration::seconds(item.duration_secs as i64)).min(end); slots.push(ScheduledSlot { @@ -312,6 +312,7 @@ impl ScheduleEngineService { /// previous generation. Used only by `Sequential` for series continuity. async fn resolve_algorithmic( &self, + provider_id: &str, filter: &MediaFilter, strategy: &FillStrategy, start: DateTime, @@ -327,7 +328,7 @@ impl ScheduleEngineService { // `candidates` — all items matching the filter, in provider order. // Kept separate from `pool` so Sequential can rotate through the full // ordered list while still honouring cooldowns. - let candidates = self.media_provider.fetch_items(filter).await?; + let candidates = self.provider_registry.fetch_items(provider_id, filter).await?; if candidates.is_empty() { return Ok(vec![]); diff --git a/k-tv-backend/infra/src/lib.rs b/k-tv-backend/infra/src/lib.rs index ef9be60..61aaf2d 100644 --- a/k-tv-backend/infra/src/lib.rs +++ b/k-tv-backend/infra/src/lib.rs @@ -17,6 +17,7 @@ pub mod auth; pub mod db; pub mod factory; pub mod jellyfin; +pub mod provider_registry; mod channel_repository; mod schedule_repository; mod user_repository; @@ -26,6 +27,7 @@ pub mod local_files; // Re-export for convenience pub use db::run_migrations; +pub use provider_registry::ProviderRegistry; #[cfg(feature = "sqlite")] pub use user_repository::SqliteUserRepository; diff --git a/k-tv-backend/infra/src/local_files/scanner.rs b/k-tv-backend/infra/src/local_files/scanner.rs index 31dcdc6..d55b8cb 100644 --- a/k-tv-backend/infra/src/local_files/scanner.rs +++ b/k-tv-backend/infra/src/local_files/scanner.rs @@ -115,9 +115,7 @@ fn extract_year(s: &str) -> Option { if !chars[i..i + 4].iter().all(|c| c.is_ascii_digit()) { continue; } - // Must start with 19 or 20. - let prefix = chars[i] as u8 * 10 + chars[i + 1] as u8 - b'0' * 11; - // Simpler: just parse and range-check. + // Parse and range-check. let s4: String = chars[i..i + 4].iter().collect(); let num: u16 = s4.parse().ok()?; if !(1900..=2099).contains(&num) { @@ -126,7 +124,6 @@ fn extract_year(s: &str) -> Option { // Word-boundary: char before and after must not be digits. let before_ok = i == 0 || !chars[i - 1].is_ascii_digit(); let after_ok = i + 4 >= n || !chars[i + 4].is_ascii_digit(); - let _ = prefix; if before_ok && after_ok { return Some(num); } diff --git a/k-tv-backend/infra/src/provider_registry.rs b/k-tv-backend/infra/src/provider_registry.rs new file mode 100644 index 0000000..038473f --- /dev/null +++ b/k-tv-backend/infra/src/provider_registry.rs @@ -0,0 +1,167 @@ +//! Provider registry — routes media operations to the correct named provider. +//! +//! Item IDs are prefixed with the provider key separated by `::`, e.g. +//! `"jellyfin::abc123"` or `"local::base64path"`. The registry strips the +//! prefix before calling the underlying provider and re-stamps returned IDs +//! so every item is self-routing throughout its lifetime. +//! +//! An empty prefix (un-prefixed IDs from old data, or new blocks with no +//! `provider_id` set) falls back to the primary (first-registered) provider. + +use std::sync::Arc; + +use async_trait::async_trait; +use domain::errors::{DomainError, DomainResult}; +use domain::ports::{ + Collection, IMediaProvider, IProviderRegistry, ProviderCapabilities, SeriesSummary, + StreamQuality, +}; +use domain::{ContentType, MediaFilter, MediaItem, MediaItemId}; + +/// Registry of named media providers. +/// +/// Providers are registered with a short key (e.g. `"jellyfin"`, `"local"`). +/// The first registered provider is the *primary* — it handles un-prefixed IDs +/// and empty `provider_id` strings for backward compatibility. +pub struct ProviderRegistry { + /// Ordered list of `(key, provider)` pairs. Order determines the primary. + providers: Vec<(String, Arc)>, +} + +impl ProviderRegistry { + pub fn new() -> Self { + Self { providers: Vec::new() } + } + + /// Register a provider under `id`. The first registered becomes the primary. + pub fn register(&mut self, id: impl Into, provider: Arc) { + self.providers.push((id.into(), provider)); + } + + pub fn is_empty(&self) -> bool { + self.providers.is_empty() + } + + // ------------------------------------------------------------------------- + // Internal helpers + // ------------------------------------------------------------------------- + + fn prefix_id(provider_id: &str, raw_id: &str) -> MediaItemId { + MediaItemId::new(format!("{}::{}", provider_id, raw_id)) + } + + /// Split `"provider_key::raw_id"` into `(key, raw_id)`. + /// Un-prefixed IDs return `("", full_id)` → primary provider fallback. + fn parse_prefix(id: &MediaItemId) -> (&str, &str) { + let s: &str = id.as_ref(); + match s.find("::") { + Some(pos) => (&s[..pos], &s[pos + 2..]), + None => ("", s), + } + } + + /// Resolve a provider key to the provider, defaulting to primary on empty key. + /// Returns `(resolved_key, provider)` so the caller can re-stamp IDs. + fn resolve_provider<'a>( + &'a self, + provider_id: &str, + ) -> DomainResult<(&'a str, &'a Arc)> { + if provider_id.is_empty() { + self.providers + .first() + .map(|(id, p)| (id.as_str(), p)) + .ok_or_else(|| DomainError::InfrastructureError("No providers registered".into())) + } else { + self.providers + .iter() + .find(|(id, _)| id == provider_id) + .map(|(id, p)| (id.as_str(), p)) + .ok_or_else(|| { + DomainError::InfrastructureError( + format!("Provider '{}' not found", provider_id), + ) + }) + } + } + + fn wrap_items(provider_id: &str, items: Vec) -> Vec { + items + .into_iter() + .map(|mut item| { + item.id = Self::prefix_id(provider_id, item.id.as_ref()); + item + }) + .collect() + } +} + +impl Default for ProviderRegistry { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl IProviderRegistry for ProviderRegistry { + async fn fetch_items(&self, provider_id: &str, filter: &MediaFilter) -> DomainResult> { + let (pid, provider) = self.resolve_provider(provider_id)?; + let items = provider.fetch_items(filter).await?; + Ok(Self::wrap_items(pid, items)) + } + + async fn fetch_by_id(&self, item_id: &MediaItemId) -> DomainResult> { + let (prefix, raw) = Self::parse_prefix(item_id); + let (pid, provider) = self.resolve_provider(prefix)?; + let raw_id = MediaItemId::new(raw); + let result = provider.fetch_by_id(&raw_id).await?; + Ok(result.map(|mut item| { + item.id = Self::prefix_id(pid, item.id.as_ref()); + item + })) + } + + async fn get_stream_url(&self, item_id: &MediaItemId, quality: &StreamQuality) -> DomainResult { + let (prefix, raw) = Self::parse_prefix(item_id); + let (_, provider) = self.resolve_provider(prefix)?; + let raw_id = MediaItemId::new(raw); + provider.get_stream_url(&raw_id, quality).await + } + + fn provider_ids(&self) -> Vec { + self.providers.iter().map(|(id, _)| id.clone()).collect() + } + + fn primary_id(&self) -> &str { + self.providers + .first() + .map(|(id, _)| id.as_str()) + .unwrap_or("") + } + + fn capabilities(&self, provider_id: &str) -> Option { + let target = if provider_id.is_empty() { + self.providers.first().map(|(id, _)| id.as_str())? + } else { + provider_id + }; + self.providers + .iter() + .find(|(id, _)| id == target) + .map(|(_, p)| p.capabilities()) + } + + async fn list_collections(&self, provider_id: &str) -> DomainResult> { + let (_, provider) = self.resolve_provider(provider_id)?; + provider.list_collections().await + } + + async fn list_series(&self, provider_id: &str, collection_id: Option<&str>) -> DomainResult> { + let (_, provider) = self.resolve_provider(provider_id)?; + provider.list_series(collection_id).await + } + + async fn list_genres(&self, provider_id: &str, content_type: Option<&ContentType>) -> DomainResult> { + let (_, provider) = self.resolve_provider(provider_id)?; + provider.list_genres(content_type).await + } +} diff --git a/k-tv-backend/mcp/src/main.rs b/k-tv-backend/mcp/src/main.rs index b0a170c..c271413 100644 --- a/k-tv-backend/mcp/src/main.rs +++ b/k-tv-backend/mcp/src/main.rs @@ -2,8 +2,9 @@ use std::sync::Arc; use std::time::Duration as StdDuration; use domain::{ - ChannelService, DomainError, DomainResult, IMediaProvider, MediaFilter, MediaItemId, - ProviderCapabilities, ScheduleEngineService, StreamQuality, StreamingProtocol, UserService, + ChannelService, DomainError, DomainResult, IMediaProvider, IProviderRegistry, MediaFilter, + MediaItemId, ProviderCapabilities, ScheduleEngineService, StreamQuality, StreamingProtocol, + UserService, }; use infra::factory::{build_channel_repository, build_schedule_repository, build_user_repository}; use infra::run_migrations; @@ -69,7 +70,7 @@ async fn main() -> anyhow::Result<()> { let _user_service = UserService::new(user_repo); let channel_service = ChannelService::new(channel_repo.clone()); - let mut maybe_provider: Option> = None; + let mut registry = infra::ProviderRegistry::new(); #[cfg(feature = "jellyfin")] { @@ -78,41 +79,34 @@ async fn main() -> anyhow::Result<()> { let user_id = std::env::var("JELLYFIN_USER_ID").ok(); if let (Some(base_url), Some(api_key), Some(user_id)) = (base_url, api_key, user_id) { info!("Media provider: Jellyfin at {}", base_url); - maybe_provider = Some(Arc::new(infra::JellyfinMediaProvider::new( - infra::JellyfinConfig { - base_url, - api_key, - user_id, - }, + registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new( + infra::JellyfinConfig { base_url, api_key, user_id }, ))); } } #[cfg(feature = "local-files")] - if maybe_provider.is_none() { - if let Some(dir) = std::env::var("LOCAL_FILES_DIR").ok().map(std::path::PathBuf::from) { - if let k_core::db::DatabasePool::Sqlite(ref sqlite_pool) = db_pool { - let base_url = std::env::var("BASE_URL") - .unwrap_or_else(|_| "http://localhost:3000".to_string()); - let lf_cfg = infra::LocalFilesConfig { - root_dir: dir, - base_url, - }; - let idx = Arc::new(infra::LocalIndex::new(&lf_cfg, sqlite_pool.clone()).await); - let scan_idx = Arc::clone(&idx); - tokio::spawn(async move { scan_idx.rescan().await; }); - maybe_provider = Some(Arc::new(infra::LocalFilesProvider::new(idx, lf_cfg))); - } + if let Some(dir) = std::env::var("LOCAL_FILES_DIR").ok().map(std::path::PathBuf::from) { + if let k_core::db::DatabasePool::Sqlite(ref sqlite_pool) = db_pool { + let base_url = std::env::var("BASE_URL") + .unwrap_or_else(|_| "http://localhost:3000".to_string()); + let lf_cfg = infra::LocalFilesConfig { root_dir: dir, base_url }; + let idx = Arc::new(infra::LocalIndex::new(&lf_cfg, sqlite_pool.clone()).await); + let scan_idx = Arc::clone(&idx); + tokio::spawn(async move { scan_idx.rescan().await; }); + registry.register("local", Arc::new(infra::LocalFilesProvider::new(idx, lf_cfg))); } } - let media_provider: Arc = maybe_provider.unwrap_or_else(|| { + if registry.is_empty() { tracing::warn!("No media provider configured. Set JELLYFIN_BASE_URL or LOCAL_FILES_DIR."); - Arc::new(NoopMediaProvider) - }); + registry.register("noop", Arc::new(NoopMediaProvider)); + } + + let registry = Arc::new(registry); let schedule_engine = ScheduleEngineService::new( - Arc::clone(&media_provider), + Arc::clone(®istry) as Arc, channel_repo, schedule_repo, ); @@ -120,7 +114,7 @@ async fn main() -> anyhow::Result<()> { let server = KTvMcpServer { channel_service: Arc::new(channel_service), schedule_engine: Arc::new(schedule_engine), - media_provider, + provider_registry: registry, owner_id, }; diff --git a/k-tv-backend/mcp/src/server.rs b/k-tv-backend/mcp/src/server.rs index 65a2060..4768869 100644 --- a/k-tv-backend/mcp/src/server.rs +++ b/k-tv-backend/mcp/src/server.rs @@ -1,8 +1,7 @@ use std::sync::Arc; use domain::{ - ChannelService, ContentType, IMediaProvider, ProgrammingBlock, ScheduleConfig, - ScheduleEngineService, + ChannelService, ContentType, ProgrammingBlock, ScheduleConfig, ScheduleEngineService, }; use rmcp::{ ServerHandler, @@ -19,7 +18,7 @@ use crate::tools::{channels, library, schedule}; pub struct KTvMcpServer { pub channel_service: Arc, pub schedule_engine: Arc, - pub media_provider: Arc, + pub provider_registry: Arc, pub owner_id: Uuid, } @@ -244,7 +243,7 @@ impl KTvMcpServer { #[tool(description = "List media collections/libraries available in the configured provider")] async fn list_collections(&self) -> String { - library::list_collections(&self.media_provider).await + library::list_collections(&self.provider_registry).await } #[tool( @@ -252,7 +251,7 @@ impl KTvMcpServer { )] async fn list_genres(&self, #[tool(aggr)] p: ListGenresParams) -> String { let ct = p.content_type.as_deref().and_then(parse_content_type); - library::list_genres(&self.media_provider, ct).await + library::list_genres(&self.provider_registry, ct).await } #[tool( @@ -261,7 +260,7 @@ impl KTvMcpServer { async fn search_media(&self, #[tool(aggr)] p: SearchMediaParams) -> String { let ct = p.content_type.as_deref().and_then(parse_content_type); library::search_media( - &self.media_provider, + &self.provider_registry, ct, p.genres.unwrap_or_default(), p.search_term, diff --git a/k-tv-backend/mcp/src/tools/library.rs b/k-tv-backend/mcp/src/tools/library.rs index 51456bf..7e0abb8 100644 --- a/k-tv-backend/mcp/src/tools/library.rs +++ b/k-tv-backend/mcp/src/tools/library.rs @@ -1,27 +1,27 @@ -use domain::{ContentType, IMediaProvider, MediaFilter}; +use domain::{ContentType, IProviderRegistry, MediaFilter}; use std::sync::Arc; use crate::error::{domain_err, ok_json}; -pub async fn list_collections(provider: &Arc) -> String { - match provider.list_collections().await { +pub async fn list_collections(registry: &Arc) -> String { + match registry.list_collections("").await { Ok(cols) => ok_json(&cols), Err(e) => domain_err(e), } } pub async fn list_genres( - provider: &Arc, + registry: &Arc, content_type: Option, ) -> String { - match provider.list_genres(content_type.as_ref()).await { + match registry.list_genres("", content_type.as_ref()).await { Ok(genres) => ok_json(&genres), Err(e) => domain_err(e), } } pub async fn search_media( - provider: &Arc, + registry: &Arc, content_type: Option, genres: Vec, search_term: Option, @@ -36,7 +36,7 @@ pub async fn search_media( collections, ..Default::default() }; - match provider.fetch_items(&filter).await { + match registry.fetch_items("", &filter).await { Ok(items) => ok_json(&items), Err(e) => domain_err(e), } diff --git a/k-tv-frontend/app/(main)/dashboard/components/edit-channel-sheet.tsx b/k-tv-frontend/app/(main)/dashboard/components/edit-channel-sheet.tsx index 0efcb69..b7f4a0b 100644 --- a/k-tv-frontend/app/(main)/dashboard/components/edit-channel-sheet.tsx +++ b/k-tv-frontend/app/(main)/dashboard/components/edit-channel-sheet.tsx @@ -19,7 +19,7 @@ import type { FillStrategy, ContentType, MediaFilter, - ProviderCapabilities, + ProviderInfo, RecyclePolicy, } from "@/lib/types"; @@ -57,10 +57,12 @@ const blockSchema = z.object({ type: z.literal("algorithmic"), filter: mediaFilterSchema, strategy: z.enum(["best_fit", "sequential", "random"]), + provider_id: z.string().optional(), }), z.object({ type: z.literal("manual"), items: z.array(z.string()), + provider_id: z.string().optional(), }), ]), loop_on_finish: z.boolean().optional(), @@ -239,7 +241,8 @@ interface AlgorithmicFilterEditorProps { errors: FieldErrors; setFilter: (patch: Partial) => void; setStrategy: (strategy: FillStrategy) => void; - capabilities?: ProviderCapabilities; + setProviderId: (id: string) => void; + providers: ProviderInfo[]; } function AlgorithmicFilterEditor({ @@ -248,16 +251,23 @@ function AlgorithmicFilterEditor({ errors, setFilter, setStrategy, - capabilities, + setProviderId, + providers, }: AlgorithmicFilterEditorProps) { const [showGenres, setShowGenres] = useState(false); - const { data: collections, isLoading: loadingCollections } = useCollections(); + const providerId = content.provider_id ?? ""; + const capabilities = providers.find((p) => p.id === providerId)?.capabilities + ?? providers[0]?.capabilities; + + const { data: collections, isLoading: loadingCollections } = useCollections(providerId || undefined); const { data: series, isLoading: loadingSeries } = useSeries(undefined, { enabled: capabilities?.series !== false, + provider: providerId || undefined, }); const { data: genreOptions } = useGenres(content.filter.content_type ?? undefined, { enabled: capabilities?.genres !== false, + provider: providerId || undefined, }); const isEpisode = content.filter.content_type === "episode"; @@ -270,6 +280,16 @@ function AlgorithmicFilterEditor({

Filter

+ {providers.length > 1 && ( + + setProviderId(v)}> + {providers.map((p) => ( + + ))} + + + )} +
{/* Preview — snapshot of current filter+strategy, only fetches on explicit click */} - +
); } @@ -448,10 +468,10 @@ interface BlockEditorProps { onChange: (block: ProgrammingBlock) => void; onRemove: () => void; onSelect: () => void; - capabilities?: ProviderCapabilities; + providers: ProviderInfo[]; } -function BlockEditor({ block, index, isSelected, color, errors, onChange, onRemove, onSelect, capabilities }: BlockEditorProps) { +function BlockEditor({ block, index, isSelected, color, errors, onChange, onRemove, onSelect, providers }: BlockEditorProps) { const [expanded, setExpanded] = useState(isSelected); const elRef = useRef(null); @@ -470,12 +490,13 @@ function BlockEditor({ block, index, isSelected, color, errors, onChange, onRemo const pfx = `blocks.${index}`; const setContentType = (type: "algorithmic" | "manual") => { + const pid = content.provider_id ?? ""; onChange({ ...block, content: type === "algorithmic" - ? { type: "algorithmic", filter: defaultFilter(), strategy: "random" } - : { type: "manual", items: [] }, + ? { type: "algorithmic", filter: defaultFilter(), strategy: "random", provider_id: pid } + : { type: "manual", items: [], provider_id: pid }, }); }; @@ -489,6 +510,10 @@ function BlockEditor({ block, index, isSelected, color, errors, onChange, onRemo onChange({ ...block, content: { ...content, strategy } }); }; + const setProviderId = (id: string) => { + onChange({ ...block, content: { ...content, provider_id: id } }); + }; + return (
{content.strategy === "sequential" && ( @@ -734,7 +760,7 @@ interface EditChannelSheetProps { ) => void; isPending: boolean; error?: string | null; - capabilities?: ProviderCapabilities; + providers?: ProviderInfo[]; } export function EditChannelSheet({ @@ -744,7 +770,7 @@ export function EditChannelSheet({ onSubmit, isPending, error, - capabilities, + providers = [], }: EditChannelSheetProps) { const [name, setName] = useState(""); const [description, setDescription] = useState(""); @@ -1044,7 +1070,7 @@ export function EditChannelSheet({ onChange={(b) => updateBlock(idx, b)} onRemove={() => removeBlock(idx)} onSelect={() => setSelectedBlockId(block.id)} - capabilities={capabilities} + providers={providers} /> ))}
diff --git a/k-tv-frontend/app/(main)/dashboard/components/filter-preview.tsx b/k-tv-frontend/app/(main)/dashboard/components/filter-preview.tsx index 6b21cf4..588b9fd 100644 --- a/k-tv-frontend/app/(main)/dashboard/components/filter-preview.tsx +++ b/k-tv-frontend/app/(main)/dashboard/components/filter-preview.tsx @@ -8,6 +8,7 @@ import type { MediaFilter, LibraryItemResponse } from "@/lib/types"; interface FilterPreviewProps { filter: MediaFilter; strategy?: string; + provider?: string; } function fmtDuration(secs: number): string { @@ -32,10 +33,10 @@ function ItemRow({ item }: { item: LibraryItemResponse }) { ); } -type Snapshot = { filter: MediaFilter; strategy?: string }; +type Snapshot = { filter: MediaFilter; strategy?: string; provider?: string }; -export function FilterPreview({ filter, strategy }: FilterPreviewProps) { - // Capture both filter and strategy at click time so edits don't silently +export function FilterPreview({ filter, strategy, provider }: FilterPreviewProps) { + // Capture filter, strategy, and provider at click time so edits don't silently // re-fetch while the user is still configuring the block. const [snapshot, setSnapshot] = useState(null); @@ -43,14 +44,16 @@ export function FilterPreview({ filter, strategy }: FilterPreviewProps) { snapshot?.filter ?? null, !!snapshot, snapshot?.strategy, + snapshot?.provider, ); - const handlePreview = () => setSnapshot({ filter: { ...filter }, strategy }); + const handlePreview = () => setSnapshot({ filter: { ...filter }, strategy, provider }); const filterChanged = snapshot !== null && (JSON.stringify(snapshot.filter) !== JSON.stringify(filter) || - snapshot.strategy !== strategy); + snapshot.strategy !== strategy || + snapshot.provider !== provider); return (
diff --git a/k-tv-frontend/app/(main)/dashboard/page.tsx b/k-tv-frontend/app/(main)/dashboard/page.tsx index 212987f..a976540 100644 --- a/k-tv-frontend/app/(main)/dashboard/page.tsx +++ b/k-tv-frontend/app/(main)/dashboard/page.tsx @@ -367,7 +367,7 @@ export default function DashboardPage() { onSubmit={handleEdit} isPending={updateChannel.isPending} error={updateChannel.error?.message} - capabilities={capabilities} + providers={config?.providers ?? []} /> api.library.collections(token!), + queryKey: ["library", "collections", provider ?? null], + queryFn: () => api.library.collections(token!, provider), enabled: !!token, staleTime: STALE, }); } /** - * List TV series, optionally scoped to a collection. + * List TV series, optionally scoped to a collection and provider. * All series are loaded upfront so the series picker can filter client-side * without a request per keystroke. */ -export function useSeries(collectionId?: string, opts?: { enabled?: boolean }) { +export function useSeries(collectionId?: string, opts?: { enabled?: boolean; provider?: string }) { const { token } = useAuthContext(); return useQuery({ - queryKey: ["library", "series", collectionId ?? null], - queryFn: () => api.library.series(token!, collectionId), + queryKey: ["library", "series", collectionId ?? null, opts?.provider ?? null], + queryFn: () => api.library.series(token!, collectionId, opts?.provider), enabled: !!token && (opts?.enabled ?? true), staleTime: STALE, }); } -/** List available genres, optionally scoped to a content type. */ -export function useGenres(contentType?: string, opts?: { enabled?: boolean }) { +/** List available genres, optionally scoped to a content type and provider. */ +export function useGenres(contentType?: string, opts?: { enabled?: boolean; provider?: string }) { const { token } = useAuthContext(); return useQuery({ - queryKey: ["library", "genres", contentType ?? null], - queryFn: () => api.library.genres(token!, contentType), + queryKey: ["library", "genres", contentType ?? null, opts?.provider ?? null], + queryFn: () => api.library.genres(token!, contentType, opts?.provider), enabled: !!token && (opts?.enabled ?? true), staleTime: STALE, }); @@ -64,11 +64,12 @@ export function useLibraryItems( filter: Pick | null, enabled: boolean, strategy?: string, + provider?: string, ) { const { token } = useAuthContext(); return useQuery({ - queryKey: ["library", "items", filter, strategy ?? null], - queryFn: () => api.library.items(token!, filter!, 30, strategy), + queryKey: ["library", "items", filter, strategy ?? null, provider ?? null], + queryFn: () => api.library.items(token!, filter!, 30, strategy, provider), enabled: !!token && enabled && !!filter, staleTime: 2 * 60 * 1000, }); diff --git a/k-tv-frontend/lib/api.ts b/k-tv-frontend/lib/api.ts index acebc96..bee43ce 100644 --- a/k-tv-frontend/lib/api.ts +++ b/k-tv-frontend/lib/api.ts @@ -108,19 +108,25 @@ export const api = { }, library: { - collections: (token: string) => - request("/library/collections", { token }), + collections: (token: string, provider?: string) => { + const params = new URLSearchParams(); + if (provider) params.set("provider", provider); + const qs = params.toString(); + return request(`/library/collections${qs ? `?${qs}` : ""}`, { token }); + }, - series: (token: string, collectionId?: string) => { + series: (token: string, collectionId?: string, provider?: string) => { const params = new URLSearchParams(); if (collectionId) params.set("collection", collectionId); + if (provider) params.set("provider", provider); const qs = params.toString(); return request(`/library/series${qs ? `?${qs}` : ""}`, { token }); }, - genres: (token: string, contentType?: string) => { + genres: (token: string, contentType?: string, provider?: string) => { const params = new URLSearchParams(); if (contentType) params.set("type", contentType); + if (provider) params.set("provider", provider); const qs = params.toString(); return request(`/library/genres${qs ? `?${qs}` : ""}`, { token }); }, @@ -130,6 +136,7 @@ export const api = { filter: Pick, limit = 50, strategy?: string, + provider?: string, ) => { const params = new URLSearchParams(); if (filter.search_term) params.set("q", filter.search_term); @@ -138,6 +145,7 @@ export const api = { if (filter.collections?.[0]) params.set("collection", filter.collections[0]); params.set("limit", String(limit)); if (strategy) params.set("strategy", strategy); + if (provider) params.set("provider", provider); return request(`/library/items?${params}`, { token }); }, }, diff --git a/k-tv-frontend/lib/types.ts b/k-tv-frontend/lib/types.ts index 49ba5d8..d418162 100644 --- a/k-tv-frontend/lib/types.ts +++ b/k-tv-frontend/lib/types.ts @@ -57,8 +57,8 @@ export interface RecyclePolicy { } export type BlockContent = - | { type: "algorithmic"; filter: MediaFilter; strategy: FillStrategy } - | { type: "manual"; items: string[] }; + | { type: "algorithmic"; filter: MediaFilter; strategy: FillStrategy; provider_id?: string } + | { type: "manual"; items: string[]; provider_id?: string }; export interface ProgrammingBlock { id: string; @@ -95,8 +95,16 @@ export interface ProviderCapabilities { rescan: boolean; } +export interface ProviderInfo { + id: string; + capabilities: ProviderCapabilities; +} + export interface ConfigResponse { allow_registration: boolean; + /** All registered providers. Added in multi-provider update. */ + providers: ProviderInfo[]; + /** Primary provider capabilities — kept for backward compat. */ provider_capabilities: ProviderCapabilities; }