use async_trait::async_trait; use domain::{ Collection, ContentType, DomainError, DomainResult, IMediaProvider, MediaFilter, MediaItem, MediaItemId, ProviderCapabilities, SeriesSummary, StreamQuality, StreamingProtocol, }; use super::config::JellyfinConfig; use super::mapping::{map_jellyfin_item, TICKS_PER_SEC}; use super::models::{jellyfin_item_type, JellyfinItemsResponse, JellyfinPlaybackInfoResponse}; pub struct JellyfinMediaProvider { pub(super) client: reqwest::Client, pub(super) config: JellyfinConfig, } impl JellyfinMediaProvider { pub fn new(config: JellyfinConfig) -> Self { Self { client: reqwest::Client::new(), config: JellyfinConfig { base_url: config.base_url.trim_end_matches('/').to_string(), ..config }, } } /// Inner fetch: applies all filter fields plus an optional series name override. async fn fetch_items_for_series( &self, filter: &MediaFilter, series_name: Option<&str>, ) -> DomainResult> { let url = format!( "{}/Users/{}/Items", self.config.base_url, self.config.user_id ); let mut params: Vec<(&str, String)> = vec![ ("Recursive", "true".into()), ("Fields", "Genres,Tags,RunTimeTicks,ProductionYear,Overview".into()), ]; if let Some(ct) = &filter.content_type { params.push(("IncludeItemTypes", jellyfin_item_type(ct).into())); } if !filter.genres.is_empty() { params.push(("Genres", filter.genres.join("|"))); } if let Some(decade) = filter.decade { params.push(("MinYear", decade.to_string())); params.push(("MaxYear", (decade + 9).to_string())); } if !filter.tags.is_empty() { params.push(("Tags", filter.tags.join("|"))); } if let Some(min) = filter.min_duration_secs { params.push(("MinRunTimeTicks", (min as i64 * TICKS_PER_SEC).to_string())); } if let Some(max) = filter.max_duration_secs { params.push(("MaxRunTimeTicks", (max as i64 * TICKS_PER_SEC).to_string())); } if let Some(name) = series_name { // Series-level targeting: skip ParentId so the show is found regardless // of which library it lives in. SeriesName is already precise enough. params.push(("SeriesName", name.to_string())); // Return episodes in chronological order when a specific series is // requested — season first, then episode within the season. params.push(("SortBy", "ParentIndexNumber,IndexNumber".into())); params.push(("SortOrder", "Ascending".into())); } else { // No series filter — scope to the collection (library) if one is set. if let Some(parent_id) = filter.collections.first() { params.push(("ParentId", parent_id.clone())); } } if let Some(q) = &filter.search_term { params.push(("SearchTerm", q.clone())); } let response = self .client .get(&url) .header("X-Emby-Token", &self.config.api_key) .query(¶ms) .send() .await .map_err(|e| { DomainError::InfrastructureError(format!("Jellyfin request failed: {}", e)) })?; if !response.status().is_success() { return Err(DomainError::InfrastructureError(format!( "Jellyfin returned HTTP {}", response.status() ))); } let body: JellyfinItemsResponse = response.json().await.map_err(|e| { DomainError::InfrastructureError(format!("Failed to parse Jellyfin response: {}", e)) })?; // Jellyfin's SeriesName query param is not a strict filter — it can // bleed items from other shows. Post-filter in Rust to guarantee that // only the requested series is returned. let items = body.items.into_iter().filter_map(map_jellyfin_item); let items: Vec = if let Some(name) = series_name { items .filter(|item| { item.series_name .as_deref() .map(|s| s.eq_ignore_ascii_case(name)) .unwrap_or(false) }) .collect() } else { items.collect() }; Ok(items) } } #[async_trait] impl IMediaProvider for JellyfinMediaProvider { fn capabilities(&self) -> ProviderCapabilities { ProviderCapabilities { collections: true, series: true, genres: true, tags: true, decade: true, search: true, streaming_protocol: StreamingProtocol::Hls, rescan: false, } } /// Fetch items matching `filter` from the Jellyfin library. /// /// When `series_names` has more than one entry the results from each series /// are fetched sequentially and concatenated (Jellyfin only supports one /// `SeriesName` param per request). async fn fetch_items(&self, filter: &MediaFilter) -> DomainResult> { match filter.series_names.len() { 0 | 1 => { let series = filter.series_names.first().map(String::as_str); self.fetch_items_for_series(filter, series).await } _ => { // Fetch each series independently, then interleave round-robin. // Round-robin ensures every show gets fair representation when a // downstream limit is applied (preview, block fill) even if one // series has far more episodes than another. let mut per_series: Vec> = Vec::new(); for series_name in &filter.series_names { let items = self .fetch_items_for_series(filter, Some(series_name.as_str())) .await?; if !items.is_empty() { per_series.push(items); } } let max_len = per_series.iter().map(|s| s.len()).max().unwrap_or(0); let mut all = Vec::with_capacity(per_series.iter().map(|s| s.len()).sum()); for i in 0..max_len { for s in &per_series { if let Some(item) = s.get(i) { all.push(item.clone()); } } } Ok(all) } } } /// Fetch a single item by its opaque ID. /// /// Returns `None` if the item is not found or cannot be mapped. async fn fetch_by_id(&self, item_id: &MediaItemId) -> DomainResult> { let url = format!( "{}/Users/{}/Items", self.config.base_url, self.config.user_id ); let response = self .client .get(&url) .header("X-Emby-Token", &self.config.api_key) .query(&[ ("Ids", item_id.as_ref()), ("Fields", "Genres,Tags,RunTimeTicks,ProductionYear"), ]) .send() .await .map_err(|e| { DomainError::InfrastructureError(format!("Jellyfin request failed: {}", e)) })?; if !response.status().is_success() { return Ok(None); } let body: JellyfinItemsResponse = response.json().await.map_err(|e| { DomainError::InfrastructureError(format!("Failed to parse Jellyfin response: {}", e)) })?; Ok(body.items.into_iter().next().and_then(map_jellyfin_item)) } /// List top-level virtual libraries available to the configured user. /// /// Uses the `/Users/{userId}/Views` endpoint which returns exactly the /// top-level nodes the user has access to (Movies, TV Shows, etc.). async fn list_collections(&self) -> DomainResult> { let url = format!( "{}/Users/{}/Views", self.config.base_url, self.config.user_id ); let response = self .client .get(&url) .header("X-Emby-Token", &self.config.api_key) .send() .await .map_err(|e| { DomainError::InfrastructureError(format!("Jellyfin request failed: {}", e)) })?; if !response.status().is_success() { return Err(DomainError::InfrastructureError(format!( "Jellyfin returned HTTP {}", response.status() ))); } let body: JellyfinItemsResponse = response.json().await.map_err(|e| { DomainError::InfrastructureError(format!("Failed to parse Jellyfin response: {}", e)) })?; Ok(body .items .into_iter() .map(|item| Collection { id: item.id, name: item.name, collection_type: item.collection_type, }) .collect()) } /// List all Series items, optionally scoped to a collection (ParentId). /// /// Results are sorted alphabetically. `RecursiveItemCount` gives the total /// episode count across all seasons without a second round-trip. async fn list_series(&self, collection_id: Option<&str>) -> DomainResult> { let url = format!( "{}/Users/{}/Items", self.config.base_url, self.config.user_id ); let mut params: Vec<(&str, String)> = vec![ ("Recursive", "true".into()), ("IncludeItemTypes", "Series".into()), ( "Fields", "Genres,ProductionYear,RecursiveItemCount".into(), ), ("SortBy", "SortName".into()), ("SortOrder", "Ascending".into()), ]; if let Some(id) = collection_id { params.push(("ParentId", id.to_string())); } let response = self .client .get(&url) .header("X-Emby-Token", &self.config.api_key) .query(¶ms) .send() .await .map_err(|e| { DomainError::InfrastructureError(format!("Jellyfin request failed: {}", e)) })?; if !response.status().is_success() { return Err(DomainError::InfrastructureError(format!( "Jellyfin returned HTTP {}", response.status() ))); } let body: JellyfinItemsResponse = response.json().await.map_err(|e| { DomainError::InfrastructureError(format!("Failed to parse Jellyfin response: {}", e)) })?; Ok(body .items .into_iter() .map(|item| SeriesSummary { id: item.id, name: item.name, episode_count: item.recursive_item_count.unwrap_or(0), genres: item.genres.unwrap_or_default(), year: item.production_year, }) .collect()) } /// List available genres from the Jellyfin `/Genres` endpoint. /// /// Optionally filtered to a specific content type (Movie or Episode). async fn list_genres( &self, content_type: Option<&ContentType>, ) -> DomainResult> { let url = format!("{}/Genres", self.config.base_url); let mut params: Vec<(&str, String)> = vec![ ("UserId", self.config.user_id.clone()), ("SortBy", "SortName".into()), ("SortOrder", "Ascending".into()), ]; if let Some(ct) = content_type { params.push(("IncludeItemTypes", jellyfin_item_type(ct).into())); } let response = self .client .get(&url) .header("X-Emby-Token", &self.config.api_key) .query(¶ms) .send() .await .map_err(|e| { DomainError::InfrastructureError(format!("Jellyfin request failed: {}", e)) })?; if !response.status().is_success() { return Err(DomainError::InfrastructureError(format!( "Jellyfin returned HTTP {}", response.status() ))); } let body: JellyfinItemsResponse = response.json().await.map_err(|e| { DomainError::InfrastructureError(format!("Failed to parse Jellyfin response: {}", e)) })?; Ok(body.items.into_iter().map(|item| item.name).collect()) } async fn get_stream_url(&self, item_id: &MediaItemId, quality: &StreamQuality) -> DomainResult { match quality { StreamQuality::Direct => { let url = format!("{}/Items/{}/PlaybackInfo", self.config.base_url, item_id.as_ref()); let resp = self.client.post(&url) .header("X-Emby-Token", &self.config.api_key) .query(&[("userId", &self.config.user_id), ("mediaSourceId", &item_id.as_ref().to_string())]) .json(&serde_json::json!({})) .send().await .map_err(|e| DomainError::InfrastructureError(format!("PlaybackInfo failed: {e}")))?; if resp.status().is_success() { let info: JellyfinPlaybackInfoResponse = resp.json().await .map_err(|e| DomainError::InfrastructureError(format!("PlaybackInfo parse failed: {e}")))?; if let Some(src) = info.media_sources.first() { if src.supports_direct_stream { if let Some(rel_url) = &src.direct_stream_url { return Ok(format!("{}{}&api_key={}", self.config.base_url, rel_url, self.config.api_key)); } } } } // Fallback: HLS at 8 Mbps Ok(self.hls_url(item_id, 8_000_000)) } StreamQuality::Transcode(bps) => Ok(self.hls_url(item_id, *bps)), } } } impl JellyfinMediaProvider { fn hls_url(&self, item_id: &MediaItemId, bitrate: u32) -> String { format!( "{}/Videos/{}/master.m3u8?videoCodec=h264&audioCodec=aac&VideoBitRate={}&mediaSourceId={}&api_key={}", self.config.base_url, item_id.as_ref(), bitrate, item_id.as_ref(), self.config.api_key, ) } }