refactor: extract provider registry to provider_registry.rs

This commit is contained in:
2026-03-16 04:36:41 +01:00
parent 5949ffc63b
commit b25ae95626
2 changed files with 213 additions and 174 deletions

View File

@@ -10,7 +10,7 @@ use axum::http::{HeaderName, HeaderValue};
use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer}; use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
use tracing::info; use tracing::info;
use domain::{ChannelService, IMediaProvider, IProviderRegistry, ProviderCapabilities, ScheduleEngineService, StreamingProtocol, UserService}; use domain::{ChannelService, IProviderRegistry, ScheduleEngineService, UserService};
use infra::factory::{build_activity_log_repository, build_channel_repository, build_provider_config_repository, build_schedule_repository, build_user_repository}; use infra::factory::{build_activity_log_repository, build_channel_repository, build_provider_config_repository, build_schedule_repository, build_user_repository};
#[cfg(feature = "local-files")] #[cfg(feature = "local-files")]
use infra::factory::build_transcode_settings_repository; use infra::factory::build_transcode_settings_repository;
@@ -19,6 +19,7 @@ use tokio::net::TcpListener;
mod config; mod config;
mod database; mod database;
mod provider_registry;
mod dto; mod dto;
mod error; mod error;
mod events; mod events;
@@ -31,7 +32,7 @@ mod state;
mod telemetry; mod telemetry;
mod webhook; mod webhook;
use crate::config::{Config, ConfigSource}; use crate::config::Config;
use crate::state::AppState; use crate::state::AppState;
#[tokio::main] #[tokio::main]
@@ -54,129 +55,13 @@ async fn main() -> anyhow::Result<()> {
let channel_service = ChannelService::new(channel_repo.clone()); let channel_service = ChannelService::new(channel_repo.clone());
// Build provider registry — all configured providers are registered simultaneously. // Build provider registry — all configured providers are registered simultaneously.
#[cfg(feature = "local-files")]
let mut local_index: Option<Arc<infra::LocalIndex>> = None;
#[cfg(feature = "local-files")]
let mut transcode_manager: Option<Arc<infra::TranscodeManager>> = None;
let mut registry = infra::ProviderRegistry::new();
let provider_config_repo = build_provider_config_repository(&db_pool).await?; let provider_config_repo = build_provider_config_repository(&db_pool).await?;
if config.config_source == ConfigSource::Db { let bundle = provider_registry::build_provider_registry(
tracing::info!("CONFIG_SOURCE=db: loading provider configs from database"); &config, &db_pool, &provider_config_repo,
let rows = provider_config_repo.get_all().await?; ).await?;
for row in &rows {
if !row.enabled { continue; }
match row.provider_type.as_str() {
#[cfg(feature = "jellyfin")]
"jellyfin" => {
if let Ok(cfg) = serde_json::from_str::<infra::JellyfinConfig>(&row.config_json) {
tracing::info!("Loading Jellyfin provider from DB config");
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(cfg)));
}
}
#[cfg(feature = "local-files")]
"local_files" => {
if let Ok(cfg_map) = serde_json::from_str::<std::collections::HashMap<String, String>>(&row.config_json) {
if let Some(files_dir) = cfg_map.get("files_dir") {
let transcode_dir = cfg_map.get("transcode_dir")
.filter(|s| !s.is_empty())
.map(std::path::PathBuf::from);
let cleanup_ttl_hours: u32 = cfg_map.get("cleanup_ttl_hours")
.and_then(|s| s.parse().ok())
.unwrap_or(24);
tracing::info!("Loading local-files provider from DB config at {:?}", files_dir);
match infra::factory::build_local_files_bundle(
&db_pool,
std::path::PathBuf::from(files_dir),
transcode_dir,
cleanup_ttl_hours,
config.base_url.clone(),
).await {
Ok(bundle) => {
let scan_idx = Arc::clone(&bundle.local_index);
tokio::spawn(async move { scan_idx.rescan().await; });
if let Some(ref tm) = bundle.transcode_manager {
tracing::info!("Transcoding enabled");
// Load persisted TTL override from transcode_settings table.
let tm_clone = Arc::clone(tm);
let repo = build_transcode_settings_repository(&db_pool).await.ok();
tokio::spawn(async move {
if let Some(r) = repo {
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
tm_clone.set_cleanup_ttl(ttl);
}
}
});
}
registry.register("local", bundle.provider);
transcode_manager = bundle.transcode_manager;
local_index = Some(bundle.local_index);
}
Err(e) => tracing::warn!("Failed to build local-files provider: {}", e),
}
}
}
}
_ => {}
}
}
} else {
#[cfg(feature = "jellyfin")]
if let (Some(base_url), Some(api_key), Some(user_id)) = (
&config.jellyfin_base_url,
&config.jellyfin_api_key,
&config.jellyfin_user_id,
) {
tracing::info!("Media provider: Jellyfin at {}", base_url);
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(infra::JellyfinConfig {
base_url: base_url.clone(),
api_key: api_key.clone(),
user_id: user_id.clone(),
})));
}
#[cfg(feature = "local-files")] let registry_arc = bundle.registry;
if let Some(dir) = &config.local_files_dir {
tracing::info!("Media provider: local files at {:?}", dir);
match infra::factory::build_local_files_bundle(
&db_pool,
dir.clone(),
config.transcode_dir.clone(),
config.transcode_cleanup_ttl_hours,
config.base_url.clone(),
).await {
Ok(bundle) => {
let scan_idx = Arc::clone(&bundle.local_index);
tokio::spawn(async move { scan_idx.rescan().await; });
if let Some(ref tm) = bundle.transcode_manager {
tracing::info!("Transcoding enabled; cache dir: {:?}", config.transcode_dir);
let tm_clone = Arc::clone(tm);
let repo = build_transcode_settings_repository(&db_pool).await.ok();
tokio::spawn(async move {
if let Some(r) = repo {
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
tm_clone.set_cleanup_ttl(ttl);
}
}
});
}
registry.register("local", bundle.provider);
transcode_manager = bundle.transcode_manager;
local_index = Some(bundle.local_index);
}
Err(e) => tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR: {}", e),
}
}
}
if registry.is_empty() {
tracing::warn!("No media provider configured. Set JELLYFIN_BASE_URL / LOCAL_FILES_DIR.");
registry.register("noop", Arc::new(NoopMediaProvider));
}
let registry_arc = Arc::new(registry);
let provider_registry: Arc<tokio::sync::RwLock<Arc<infra::ProviderRegistry>>> = let provider_registry: Arc<tokio::sync::RwLock<Arc<infra::ProviderRegistry>>> =
Arc::new(tokio::sync::RwLock::new(Arc::clone(&registry_arc))); Arc::new(tokio::sync::RwLock::new(Arc::clone(&registry_arc)));
@@ -218,11 +103,11 @@ async fn main() -> anyhow::Result<()> {
.await?; .await?;
#[cfg(feature = "local-files")] #[cfg(feature = "local-files")]
if let Some(idx) = local_index { if let Some(idx) = bundle.local_index {
*state.local_index.write().await = Some(idx); *state.local_index.write().await = Some(idx);
} }
#[cfg(feature = "local-files")] #[cfg(feature = "local-files")]
if let Some(tm) = transcode_manager { if let Some(tm) = bundle.transcode_manager {
*state.transcode_manager.write().await = Some(tm); *state.transcode_manager.write().await = Some(tm);
} }
@@ -283,53 +168,3 @@ async fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
/// Stand-in provider used when no real media source is configured.
/// Returns a descriptive error for every call so schedule endpoints fail
/// gracefully rather than panicking at startup.
struct NoopMediaProvider;
#[async_trait::async_trait]
impl IMediaProvider for NoopMediaProvider {
fn capabilities(&self) -> ProviderCapabilities {
ProviderCapabilities {
collections: false,
series: false,
genres: false,
tags: false,
decade: false,
search: false,
streaming_protocol: StreamingProtocol::DirectFile,
rescan: false,
transcode: false,
}
}
async fn fetch_items(
&self,
_: &domain::MediaFilter,
) -> domain::DomainResult<Vec<domain::MediaItem>> {
Err(domain::DomainError::InfrastructureError(
"No media provider configured. Set JELLYFIN_BASE_URL or LOCAL_FILES_DIR.".into(),
))
}
async fn fetch_by_id(
&self,
_: &domain::MediaItemId,
) -> domain::DomainResult<Option<domain::MediaItem>> {
Err(domain::DomainError::InfrastructureError(
"No media provider configured.".into(),
))
}
async fn get_stream_url(
&self,
_: &domain::MediaItemId,
_: &domain::StreamQuality,
) -> domain::DomainResult<String> {
Err(domain::DomainError::InfrastructureError(
"No media provider configured.".into(),
))
}
}

View File

@@ -0,0 +1,204 @@
use std::sync::Arc;
use domain::{
DomainError, IMediaProvider, ProviderCapabilities, ProviderConfigRepository,
StreamingProtocol, StreamQuality,
};
use k_core::db::DatabasePool;
use crate::config::{Config, ConfigSource};
#[cfg(feature = "local-files")]
use infra::factory::build_transcode_settings_repository;
pub struct ProviderBundle {
pub registry: Arc<infra::ProviderRegistry>,
#[cfg(feature = "local-files")]
pub local_index: Option<Arc<infra::LocalIndex>>,
#[cfg(feature = "local-files")]
pub transcode_manager: Option<Arc<infra::TranscodeManager>>,
}
pub async fn build_provider_registry(
config: &Config,
db_pool: &Arc<DatabasePool>,
provider_config_repo: &Arc<dyn ProviderConfigRepository>,
) -> anyhow::Result<ProviderBundle> {
#[cfg(feature = "local-files")]
let mut local_index: Option<Arc<infra::LocalIndex>> = None;
#[cfg(feature = "local-files")]
let mut transcode_manager: Option<Arc<infra::TranscodeManager>> = None;
let mut registry = infra::ProviderRegistry::new();
if config.config_source == ConfigSource::Db {
tracing::info!("CONFIG_SOURCE=db: loading provider configs from database");
let rows = provider_config_repo.get_all().await?;
for row in &rows {
if !row.enabled { continue; }
match row.provider_type.as_str() {
#[cfg(feature = "jellyfin")]
"jellyfin" => {
if let Ok(cfg) = serde_json::from_str::<infra::JellyfinConfig>(&row.config_json) {
tracing::info!("Loading Jellyfin provider from DB config");
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(cfg)));
}
}
#[cfg(feature = "local-files")]
"local_files" => {
if let Ok(cfg_map) = serde_json::from_str::<std::collections::HashMap<String, String>>(&row.config_json) {
if let Some(files_dir) = cfg_map.get("files_dir") {
let transcode_dir = cfg_map.get("transcode_dir")
.filter(|s| !s.is_empty())
.map(std::path::PathBuf::from);
let cleanup_ttl_hours: u32 = cfg_map.get("cleanup_ttl_hours")
.and_then(|s| s.parse().ok())
.unwrap_or(24);
tracing::info!("Loading local-files provider from DB config at {:?}", files_dir);
match infra::factory::build_local_files_bundle(
db_pool,
std::path::PathBuf::from(files_dir),
transcode_dir,
cleanup_ttl_hours,
config.base_url.clone(),
).await {
Ok(bundle) => {
let scan_idx = Arc::clone(&bundle.local_index);
tokio::spawn(async move { scan_idx.rescan().await; });
if let Some(ref tm) = bundle.transcode_manager {
tracing::info!("Transcoding enabled");
// Load persisted TTL override from transcode_settings table.
let tm_clone = Arc::clone(tm);
let repo = build_transcode_settings_repository(db_pool).await.ok();
tokio::spawn(async move {
if let Some(r) = repo {
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
tm_clone.set_cleanup_ttl(ttl);
}
}
});
}
registry.register("local", bundle.provider);
transcode_manager = bundle.transcode_manager;
local_index = Some(bundle.local_index);
}
Err(e) => tracing::warn!("Failed to build local-files provider: {}", e),
}
}
}
}
_ => {}
}
}
} else {
#[cfg(feature = "jellyfin")]
if let (Some(base_url), Some(api_key), Some(user_id)) = (
&config.jellyfin_base_url,
&config.jellyfin_api_key,
&config.jellyfin_user_id,
) {
tracing::info!("Media provider: Jellyfin at {}", base_url);
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(infra::JellyfinConfig {
base_url: base_url.clone(),
api_key: api_key.clone(),
user_id: user_id.clone(),
})));
}
#[cfg(feature = "local-files")]
if let Some(dir) = &config.local_files_dir {
tracing::info!("Media provider: local files at {:?}", dir);
match infra::factory::build_local_files_bundle(
db_pool,
dir.clone(),
config.transcode_dir.clone(),
config.transcode_cleanup_ttl_hours,
config.base_url.clone(),
).await {
Ok(bundle) => {
let scan_idx = Arc::clone(&bundle.local_index);
tokio::spawn(async move { scan_idx.rescan().await; });
if let Some(ref tm) = bundle.transcode_manager {
tracing::info!("Transcoding enabled; cache dir: {:?}", config.transcode_dir);
let tm_clone = Arc::clone(tm);
let repo = build_transcode_settings_repository(db_pool).await.ok();
tokio::spawn(async move {
if let Some(r) = repo {
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
tm_clone.set_cleanup_ttl(ttl);
}
}
});
}
registry.register("local", bundle.provider);
transcode_manager = bundle.transcode_manager;
local_index = Some(bundle.local_index);
}
Err(e) => tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR: {}", e),
}
}
}
if registry.is_empty() {
tracing::warn!("No media provider configured. Set JELLYFIN_BASE_URL / LOCAL_FILES_DIR.");
registry.register("noop", Arc::new(NoopMediaProvider));
}
Ok(ProviderBundle {
registry: Arc::new(registry),
#[cfg(feature = "local-files")]
local_index,
#[cfg(feature = "local-files")]
transcode_manager,
})
}
/// Stand-in provider used when no real media source is configured.
/// Returns a descriptive error for every call so schedule endpoints fail
/// gracefully rather than panicking at startup.
struct NoopMediaProvider;
#[async_trait::async_trait]
impl IMediaProvider for NoopMediaProvider {
fn capabilities(&self) -> ProviderCapabilities {
ProviderCapabilities {
collections: false,
series: false,
genres: false,
tags: false,
decade: false,
search: false,
streaming_protocol: StreamingProtocol::DirectFile,
rescan: false,
transcode: false,
}
}
async fn fetch_items(
&self,
_: &domain::MediaFilter,
) -> domain::DomainResult<Vec<domain::MediaItem>> {
Err(DomainError::InfrastructureError(
"No media provider configured. Set JELLYFIN_BASE_URL or LOCAL_FILES_DIR.".into(),
))
}
async fn fetch_by_id(
&self,
_: &domain::MediaItemId,
) -> domain::DomainResult<Option<domain::MediaItem>> {
Err(DomainError::InfrastructureError(
"No media provider configured.".into(),
))
}
async fn get_stream_url(
&self,
_: &domain::MediaItemId,
_: &StreamQuality,
) -> domain::DomainResult<String> {
Err(DomainError::InfrastructureError(
"No media provider configured.".into(),
))
}
}