Compare commits
6 Commits
9d792249c9
...
7244349e97
| Author | SHA1 | Date | |
|---|---|---|---|
| 7244349e97 | |||
| 6aa86b6666 | |||
| e7bd66ffdf | |||
| b25ae95626 | |||
| 5949ffc63b | |||
| 29e654cabc |
36
k-tv-backend/api/src/database.rs
Normal file
36
k-tv-backend/api/src/database.rs
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration as StdDuration;
|
||||||
|
|
||||||
|
use crate::config::Config;
|
||||||
|
use infra::run_migrations;
|
||||||
|
use k_core::db::DatabasePool;
|
||||||
|
|
||||||
|
pub async fn init_database(config: &Config) -> anyhow::Result<Arc<DatabasePool>> {
|
||||||
|
tracing::info!("Connecting to database: {}", config.database_url);
|
||||||
|
|
||||||
|
#[cfg(all(feature = "sqlite", not(feature = "postgres")))]
|
||||||
|
let db_type = k_core::db::DbType::Sqlite;
|
||||||
|
|
||||||
|
#[cfg(all(feature = "postgres", not(feature = "sqlite")))]
|
||||||
|
let db_type = k_core::db::DbType::Postgres;
|
||||||
|
|
||||||
|
// Both features enabled: fall back to URL inspection at runtime
|
||||||
|
#[cfg(all(feature = "sqlite", feature = "postgres"))]
|
||||||
|
let db_type = if config.database_url.starts_with("postgres") {
|
||||||
|
k_core::db::DbType::Postgres
|
||||||
|
} else {
|
||||||
|
k_core::db::DbType::Sqlite
|
||||||
|
};
|
||||||
|
|
||||||
|
let db_config = k_core::db::DatabaseConfig {
|
||||||
|
db_type,
|
||||||
|
url: config.database_url.clone(),
|
||||||
|
max_connections: config.db_max_connections,
|
||||||
|
min_connections: config.db_min_connections,
|
||||||
|
acquire_timeout: StdDuration::from_secs(30),
|
||||||
|
};
|
||||||
|
|
||||||
|
let pool = k_core::db::connect(&db_config).await?;
|
||||||
|
run_migrations(&pool).await?;
|
||||||
|
Ok(Arc::new(pool))
|
||||||
|
}
|
||||||
@@ -2,27 +2,18 @@
|
|||||||
//!
|
//!
|
||||||
//! Configures and starts the HTTP server with JWT-based authentication.
|
//! Configures and starts the HTTP server with JWT-based authentication.
|
||||||
|
|
||||||
use std::collections::VecDeque;
|
use std::sync::Arc;
|
||||||
use std::net::SocketAddr;
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use std::time::Duration as StdDuration;
|
|
||||||
|
|
||||||
use axum::Router;
|
|
||||||
use axum::http::{HeaderName, HeaderValue};
|
|
||||||
use tokio::sync::broadcast;
|
|
||||||
use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
|
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
|
|
||||||
|
|
||||||
use domain::{ChannelService, IMediaProvider, IProviderRegistry, ProviderCapabilities, ScheduleEngineService, StreamingProtocol, UserService};
|
use domain::{ChannelService, IProviderRegistry, ScheduleEngineService, UserService};
|
||||||
use infra::factory::{build_activity_log_repository, build_channel_repository, build_provider_config_repository, build_schedule_repository, build_user_repository};
|
use infra::factory::{build_activity_log_repository, build_channel_repository, build_provider_config_repository, build_schedule_repository, build_user_repository};
|
||||||
#[cfg(feature = "local-files")]
|
#[cfg(feature = "local-files")]
|
||||||
use infra::factory::build_transcode_settings_repository;
|
use infra::factory::build_transcode_settings_repository;
|
||||||
use infra::run_migrations;
|
|
||||||
use k_core::http::server::{ServerConfig, apply_standard_middleware};
|
|
||||||
use tokio::net::TcpListener;
|
|
||||||
|
|
||||||
mod config;
|
mod config;
|
||||||
|
mod database;
|
||||||
|
mod provider_registry;
|
||||||
mod dto;
|
mod dto;
|
||||||
mod error;
|
mod error;
|
||||||
mod events;
|
mod events;
|
||||||
@@ -31,57 +22,25 @@ mod log_layer;
|
|||||||
mod poller;
|
mod poller;
|
||||||
mod routes;
|
mod routes;
|
||||||
mod scheduler;
|
mod scheduler;
|
||||||
|
mod server;
|
||||||
|
mod startup;
|
||||||
mod state;
|
mod state;
|
||||||
|
mod telemetry;
|
||||||
mod webhook;
|
mod webhook;
|
||||||
|
|
||||||
use crate::config::{Config, ConfigSource};
|
use crate::config::Config;
|
||||||
use crate::state::AppState;
|
use crate::state::AppState;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
// Set up broadcast channel + ring buffer for SSE log streaming.
|
let handles = telemetry::init_tracing();
|
||||||
let (log_tx, _) = broadcast::channel::<log_layer::LogLine>(512);
|
|
||||||
let log_history = Arc::new(Mutex::new(VecDeque::<log_layer::LogLine>::new()));
|
|
||||||
|
|
||||||
// Initialize tracing with our custom layer in addition to the fmt layer.
|
|
||||||
tracing_subscriber::registry()
|
|
||||||
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
|
|
||||||
.with(fmt::layer())
|
|
||||||
.with(log_layer::AppLogLayer::new(log_tx.clone(), Arc::clone(&log_history)))
|
|
||||||
.init();
|
|
||||||
|
|
||||||
let config = Config::from_env();
|
let config = Config::from_env();
|
||||||
|
|
||||||
info!("Starting server on {}:{}", config.host, config.port);
|
info!("Starting server on {}:{}", config.host, config.port);
|
||||||
|
|
||||||
// Setup database
|
// Setup database
|
||||||
tracing::info!("Connecting to database: {}", config.database_url);
|
let db_pool = database::init_database(&config).await?;
|
||||||
|
|
||||||
#[cfg(all(feature = "sqlite", not(feature = "postgres")))]
|
|
||||||
let db_type = k_core::db::DbType::Sqlite;
|
|
||||||
|
|
||||||
#[cfg(all(feature = "postgres", not(feature = "sqlite")))]
|
|
||||||
let db_type = k_core::db::DbType::Postgres;
|
|
||||||
|
|
||||||
// Both features enabled: fall back to URL inspection at runtime
|
|
||||||
#[cfg(all(feature = "sqlite", feature = "postgres"))]
|
|
||||||
let db_type = if config.database_url.starts_with("postgres") {
|
|
||||||
k_core::db::DbType::Postgres
|
|
||||||
} else {
|
|
||||||
k_core::db::DbType::Sqlite
|
|
||||||
};
|
|
||||||
|
|
||||||
let db_config = k_core::db::DatabaseConfig {
|
|
||||||
db_type,
|
|
||||||
url: config.database_url.clone(),
|
|
||||||
max_connections: config.db_max_connections,
|
|
||||||
min_connections: config.db_min_connections,
|
|
||||||
acquire_timeout: StdDuration::from_secs(30),
|
|
||||||
};
|
|
||||||
|
|
||||||
let db_pool = k_core::db::connect(&db_config).await?;
|
|
||||||
run_migrations(&db_pool).await?;
|
|
||||||
let db_pool = Arc::new(db_pool);
|
|
||||||
|
|
||||||
let user_repo = build_user_repository(&db_pool).await?;
|
let user_repo = build_user_repository(&db_pool).await?;
|
||||||
let channel_repo = build_channel_repository(&db_pool).await?;
|
let channel_repo = build_channel_repository(&db_pool).await?;
|
||||||
@@ -92,129 +51,13 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let channel_service = ChannelService::new(channel_repo.clone());
|
let channel_service = ChannelService::new(channel_repo.clone());
|
||||||
|
|
||||||
// Build provider registry — all configured providers are registered simultaneously.
|
// Build provider registry — all configured providers are registered simultaneously.
|
||||||
#[cfg(feature = "local-files")]
|
|
||||||
let mut local_index: Option<Arc<infra::LocalIndex>> = None;
|
|
||||||
#[cfg(feature = "local-files")]
|
|
||||||
let mut transcode_manager: Option<Arc<infra::TranscodeManager>> = None;
|
|
||||||
|
|
||||||
let mut registry = infra::ProviderRegistry::new();
|
|
||||||
|
|
||||||
let provider_config_repo = build_provider_config_repository(&db_pool).await?;
|
let provider_config_repo = build_provider_config_repository(&db_pool).await?;
|
||||||
|
|
||||||
if config.config_source == ConfigSource::Db {
|
let bundle = provider_registry::build_provider_registry(
|
||||||
tracing::info!("CONFIG_SOURCE=db: loading provider configs from database");
|
&config, &db_pool, &provider_config_repo,
|
||||||
let rows = provider_config_repo.get_all().await?;
|
).await?;
|
||||||
for row in &rows {
|
|
||||||
if !row.enabled { continue; }
|
|
||||||
match row.provider_type.as_str() {
|
|
||||||
#[cfg(feature = "jellyfin")]
|
|
||||||
"jellyfin" => {
|
|
||||||
if let Ok(cfg) = serde_json::from_str::<infra::JellyfinConfig>(&row.config_json) {
|
|
||||||
tracing::info!("Loading Jellyfin provider from DB config");
|
|
||||||
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(cfg)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#[cfg(feature = "local-files")]
|
|
||||||
"local_files" => {
|
|
||||||
if let Ok(cfg_map) = serde_json::from_str::<std::collections::HashMap<String, String>>(&row.config_json) {
|
|
||||||
if let Some(files_dir) = cfg_map.get("files_dir") {
|
|
||||||
let transcode_dir = cfg_map.get("transcode_dir")
|
|
||||||
.filter(|s| !s.is_empty())
|
|
||||||
.map(std::path::PathBuf::from);
|
|
||||||
let cleanup_ttl_hours: u32 = cfg_map.get("cleanup_ttl_hours")
|
|
||||||
.and_then(|s| s.parse().ok())
|
|
||||||
.unwrap_or(24);
|
|
||||||
tracing::info!("Loading local-files provider from DB config at {:?}", files_dir);
|
|
||||||
match infra::factory::build_local_files_bundle(
|
|
||||||
&db_pool,
|
|
||||||
std::path::PathBuf::from(files_dir),
|
|
||||||
transcode_dir,
|
|
||||||
cleanup_ttl_hours,
|
|
||||||
config.base_url.clone(),
|
|
||||||
).await {
|
|
||||||
Ok(bundle) => {
|
|
||||||
let scan_idx = Arc::clone(&bundle.local_index);
|
|
||||||
tokio::spawn(async move { scan_idx.rescan().await; });
|
|
||||||
if let Some(ref tm) = bundle.transcode_manager {
|
|
||||||
tracing::info!("Transcoding enabled");
|
|
||||||
// Load persisted TTL override from transcode_settings table.
|
|
||||||
let tm_clone = Arc::clone(tm);
|
|
||||||
let repo = build_transcode_settings_repository(&db_pool).await.ok();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Some(r) = repo {
|
|
||||||
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
|
|
||||||
tm_clone.set_cleanup_ttl(ttl);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
registry.register("local", bundle.provider);
|
|
||||||
transcode_manager = bundle.transcode_manager;
|
|
||||||
local_index = Some(bundle.local_index);
|
|
||||||
}
|
|
||||||
Err(e) => tracing::warn!("Failed to build local-files provider: {}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
#[cfg(feature = "jellyfin")]
|
|
||||||
if let (Some(base_url), Some(api_key), Some(user_id)) = (
|
|
||||||
&config.jellyfin_base_url,
|
|
||||||
&config.jellyfin_api_key,
|
|
||||||
&config.jellyfin_user_id,
|
|
||||||
) {
|
|
||||||
tracing::info!("Media provider: Jellyfin at {}", base_url);
|
|
||||||
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(infra::JellyfinConfig {
|
|
||||||
base_url: base_url.clone(),
|
|
||||||
api_key: api_key.clone(),
|
|
||||||
user_id: user_id.clone(),
|
|
||||||
})));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature = "local-files")]
|
let registry_arc = bundle.registry;
|
||||||
if let Some(dir) = &config.local_files_dir {
|
|
||||||
tracing::info!("Media provider: local files at {:?}", dir);
|
|
||||||
match infra::factory::build_local_files_bundle(
|
|
||||||
&db_pool,
|
|
||||||
dir.clone(),
|
|
||||||
config.transcode_dir.clone(),
|
|
||||||
config.transcode_cleanup_ttl_hours,
|
|
||||||
config.base_url.clone(),
|
|
||||||
).await {
|
|
||||||
Ok(bundle) => {
|
|
||||||
let scan_idx = Arc::clone(&bundle.local_index);
|
|
||||||
tokio::spawn(async move { scan_idx.rescan().await; });
|
|
||||||
if let Some(ref tm) = bundle.transcode_manager {
|
|
||||||
tracing::info!("Transcoding enabled; cache dir: {:?}", config.transcode_dir);
|
|
||||||
let tm_clone = Arc::clone(tm);
|
|
||||||
let repo = build_transcode_settings_repository(&db_pool).await.ok();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Some(r) = repo {
|
|
||||||
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
|
|
||||||
tm_clone.set_cleanup_ttl(ttl);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
registry.register("local", bundle.provider);
|
|
||||||
transcode_manager = bundle.transcode_manager;
|
|
||||||
local_index = Some(bundle.local_index);
|
|
||||||
}
|
|
||||||
Err(e) => tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR: {}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if registry.is_empty() {
|
|
||||||
tracing::warn!("No media provider configured. Set JELLYFIN_BASE_URL / LOCAL_FILES_DIR.");
|
|
||||||
registry.register("noop", Arc::new(NoopMediaProvider));
|
|
||||||
}
|
|
||||||
|
|
||||||
let registry_arc = Arc::new(registry);
|
|
||||||
let provider_registry: Arc<tokio::sync::RwLock<Arc<infra::ProviderRegistry>>> =
|
let provider_registry: Arc<tokio::sync::RwLock<Arc<infra::ProviderRegistry>>> =
|
||||||
Arc::new(tokio::sync::RwLock::new(Arc::clone(®istry_arc)));
|
Arc::new(tokio::sync::RwLock::new(Arc::clone(®istry_arc)));
|
||||||
|
|
||||||
@@ -246,8 +89,8 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
provider_config_repo,
|
provider_config_repo,
|
||||||
config.clone(),
|
config.clone(),
|
||||||
event_tx.clone(),
|
event_tx.clone(),
|
||||||
log_tx,
|
handles.log_tx,
|
||||||
log_history,
|
handles.log_history,
|
||||||
activity_log_repo,
|
activity_log_repo,
|
||||||
db_pool,
|
db_pool,
|
||||||
#[cfg(feature = "local-files")]
|
#[cfg(feature = "local-files")]
|
||||||
@@ -256,118 +99,19 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
#[cfg(feature = "local-files")]
|
#[cfg(feature = "local-files")]
|
||||||
if let Some(idx) = local_index {
|
if let Some(idx) = bundle.local_index {
|
||||||
*state.local_index.write().await = Some(idx);
|
*state.local_index.write().await = Some(idx);
|
||||||
}
|
}
|
||||||
#[cfg(feature = "local-files")]
|
#[cfg(feature = "local-files")]
|
||||||
if let Some(tm) = transcode_manager {
|
if let Some(tm) = bundle.transcode_manager {
|
||||||
*state.transcode_manager.write().await = Some(tm);
|
*state.transcode_manager.write().await = Some(tm);
|
||||||
}
|
}
|
||||||
|
|
||||||
let server_config = ServerConfig {
|
startup::spawn_background_tasks(
|
||||||
cors_origins: config.cors_allowed_origins.clone(),
|
Arc::clone(&state.schedule_engine),
|
||||||
};
|
bg_channel_repo,
|
||||||
|
|
||||||
let bg_channel_repo_poller = bg_channel_repo.clone();
|
|
||||||
let bg_schedule_engine = Arc::clone(&state.schedule_engine);
|
|
||||||
tokio::spawn(scheduler::run_auto_scheduler(bg_schedule_engine, bg_channel_repo, event_tx.clone()));
|
|
||||||
|
|
||||||
let bg_schedule_engine_poller = Arc::clone(&state.schedule_engine);
|
|
||||||
tokio::spawn(poller::run_broadcast_poller(
|
|
||||||
bg_schedule_engine_poller,
|
|
||||||
bg_channel_repo_poller,
|
|
||||||
event_tx,
|
event_tx,
|
||||||
));
|
);
|
||||||
|
|
||||||
let app = Router::new()
|
server::build_and_serve(state, &config).await
|
||||||
.nest("/api/v1", routes::api_v1_router())
|
|
||||||
.with_state(state);
|
|
||||||
|
|
||||||
let app = apply_standard_middleware(app, &server_config);
|
|
||||||
|
|
||||||
// Wrap with an outer CorsLayer that includes the custom password headers.
|
|
||||||
// Being outermost it handles OPTIONS preflights before k_core's inner layer.
|
|
||||||
let origins: Vec<HeaderValue> = config
|
|
||||||
.cors_allowed_origins
|
|
||||||
.iter()
|
|
||||||
.filter_map(|o| o.parse().ok())
|
|
||||||
.collect();
|
|
||||||
let cors = CorsLayer::new()
|
|
||||||
.allow_origin(AllowOrigin::list(origins))
|
|
||||||
.allow_methods(AllowMethods::any())
|
|
||||||
.allow_headers(AllowHeaders::list([
|
|
||||||
axum::http::header::AUTHORIZATION,
|
|
||||||
axum::http::header::CONTENT_TYPE,
|
|
||||||
HeaderName::from_static("x-channel-password"),
|
|
||||||
HeaderName::from_static("x-block-password"),
|
|
||||||
]));
|
|
||||||
let app = app.layer(cors);
|
|
||||||
|
|
||||||
let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?;
|
|
||||||
let listener = TcpListener::bind(addr).await?;
|
|
||||||
|
|
||||||
tracing::info!("🚀 API server running at http://{}", addr);
|
|
||||||
tracing::info!("🔒 Authentication mode: JWT (Bearer token)");
|
|
||||||
|
|
||||||
#[cfg(feature = "auth-jwt")]
|
|
||||||
tracing::info!(" ✓ JWT auth enabled");
|
|
||||||
|
|
||||||
#[cfg(feature = "auth-oidc")]
|
|
||||||
tracing::info!(" ✓ OIDC integration enabled (stateless cookie state)");
|
|
||||||
|
|
||||||
tracing::info!("📝 API endpoints available at /api/v1/...");
|
|
||||||
|
|
||||||
axum::serve(listener, app).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Stand-in provider used when no real media source is configured.
|
|
||||||
/// Returns a descriptive error for every call so schedule endpoints fail
|
|
||||||
/// gracefully rather than panicking at startup.
|
|
||||||
struct NoopMediaProvider;
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl IMediaProvider for NoopMediaProvider {
|
|
||||||
fn capabilities(&self) -> ProviderCapabilities {
|
|
||||||
ProviderCapabilities {
|
|
||||||
collections: false,
|
|
||||||
series: false,
|
|
||||||
genres: false,
|
|
||||||
tags: false,
|
|
||||||
decade: false,
|
|
||||||
search: false,
|
|
||||||
streaming_protocol: StreamingProtocol::DirectFile,
|
|
||||||
rescan: false,
|
|
||||||
transcode: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn fetch_items(
|
|
||||||
&self,
|
|
||||||
_: &domain::MediaFilter,
|
|
||||||
) -> domain::DomainResult<Vec<domain::MediaItem>> {
|
|
||||||
Err(domain::DomainError::InfrastructureError(
|
|
||||||
"No media provider configured. Set JELLYFIN_BASE_URL or LOCAL_FILES_DIR.".into(),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn fetch_by_id(
|
|
||||||
&self,
|
|
||||||
_: &domain::MediaItemId,
|
|
||||||
) -> domain::DomainResult<Option<domain::MediaItem>> {
|
|
||||||
Err(domain::DomainError::InfrastructureError(
|
|
||||||
"No media provider configured.".into(),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_stream_url(
|
|
||||||
&self,
|
|
||||||
_: &domain::MediaItemId,
|
|
||||||
_: &domain::StreamQuality,
|
|
||||||
) -> domain::DomainResult<String> {
|
|
||||||
Err(domain::DomainError::InfrastructureError(
|
|
||||||
"No media provider configured.".into(),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
205
k-tv-backend/api/src/provider_registry.rs
Normal file
205
k-tv-backend/api/src/provider_registry.rs
Normal file
@@ -0,0 +1,205 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use domain::{
|
||||||
|
DomainError, IMediaProvider, ProviderCapabilities, ProviderConfigRepository,
|
||||||
|
StreamingProtocol, StreamQuality,
|
||||||
|
};
|
||||||
|
use k_core::db::DatabasePool;
|
||||||
|
|
||||||
|
use crate::config::{Config, ConfigSource};
|
||||||
|
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
use infra::factory::build_transcode_settings_repository;
|
||||||
|
|
||||||
|
pub struct ProviderBundle {
|
||||||
|
pub registry: Arc<infra::ProviderRegistry>,
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
pub local_index: Option<Arc<infra::LocalIndex>>,
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
pub transcode_manager: Option<Arc<infra::TranscodeManager>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn build_provider_registry(
|
||||||
|
config: &Config,
|
||||||
|
#[cfg_attr(not(feature = "local-files"), allow(unused_variables))]
|
||||||
|
db_pool: &Arc<DatabasePool>,
|
||||||
|
provider_config_repo: &Arc<dyn ProviderConfigRepository>,
|
||||||
|
) -> anyhow::Result<ProviderBundle> {
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
let mut local_index: Option<Arc<infra::LocalIndex>> = None;
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
let mut transcode_manager: Option<Arc<infra::TranscodeManager>> = None;
|
||||||
|
|
||||||
|
let mut registry = infra::ProviderRegistry::new();
|
||||||
|
|
||||||
|
if config.config_source == ConfigSource::Db {
|
||||||
|
tracing::info!("CONFIG_SOURCE=db: loading provider configs from database");
|
||||||
|
let rows = provider_config_repo.get_all().await?;
|
||||||
|
for row in &rows {
|
||||||
|
if !row.enabled { continue; }
|
||||||
|
match row.provider_type.as_str() {
|
||||||
|
#[cfg(feature = "jellyfin")]
|
||||||
|
"jellyfin" => {
|
||||||
|
if let Ok(cfg) = serde_json::from_str::<infra::JellyfinConfig>(&row.config_json) {
|
||||||
|
tracing::info!("Loading Jellyfin provider from DB config");
|
||||||
|
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(cfg)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
"local_files" => {
|
||||||
|
if let Ok(cfg_map) = serde_json::from_str::<std::collections::HashMap<String, String>>(&row.config_json) {
|
||||||
|
if let Some(files_dir) = cfg_map.get("files_dir") {
|
||||||
|
let transcode_dir = cfg_map.get("transcode_dir")
|
||||||
|
.filter(|s| !s.is_empty())
|
||||||
|
.map(std::path::PathBuf::from);
|
||||||
|
let cleanup_ttl_hours: u32 = cfg_map.get("cleanup_ttl_hours")
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.unwrap_or(24);
|
||||||
|
tracing::info!("Loading local-files provider from DB config at {:?}", files_dir);
|
||||||
|
match infra::factory::build_local_files_bundle(
|
||||||
|
db_pool,
|
||||||
|
std::path::PathBuf::from(files_dir),
|
||||||
|
transcode_dir,
|
||||||
|
cleanup_ttl_hours,
|
||||||
|
config.base_url.clone(),
|
||||||
|
).await {
|
||||||
|
Ok(bundle) => {
|
||||||
|
let scan_idx = Arc::clone(&bundle.local_index);
|
||||||
|
tokio::spawn(async move { scan_idx.rescan().await; });
|
||||||
|
if let Some(ref tm) = bundle.transcode_manager {
|
||||||
|
tracing::info!("Transcoding enabled");
|
||||||
|
// Load persisted TTL override from transcode_settings table.
|
||||||
|
let tm_clone = Arc::clone(tm);
|
||||||
|
let repo = build_transcode_settings_repository(db_pool).await.ok();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Some(r) = repo {
|
||||||
|
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
|
||||||
|
tm_clone.set_cleanup_ttl(ttl);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
registry.register("local", bundle.provider);
|
||||||
|
transcode_manager = bundle.transcode_manager;
|
||||||
|
local_index = Some(bundle.local_index);
|
||||||
|
}
|
||||||
|
Err(e) => tracing::warn!("Failed to build local-files provider: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
#[cfg(feature = "jellyfin")]
|
||||||
|
if let (Some(base_url), Some(api_key), Some(user_id)) = (
|
||||||
|
&config.jellyfin_base_url,
|
||||||
|
&config.jellyfin_api_key,
|
||||||
|
&config.jellyfin_user_id,
|
||||||
|
) {
|
||||||
|
tracing::info!("Media provider: Jellyfin at {}", base_url);
|
||||||
|
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(infra::JellyfinConfig {
|
||||||
|
base_url: base_url.clone(),
|
||||||
|
api_key: api_key.clone(),
|
||||||
|
user_id: user_id.clone(),
|
||||||
|
})));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
if let Some(dir) = &config.local_files_dir {
|
||||||
|
tracing::info!("Media provider: local files at {:?}", dir);
|
||||||
|
match infra::factory::build_local_files_bundle(
|
||||||
|
db_pool,
|
||||||
|
dir.clone(),
|
||||||
|
config.transcode_dir.clone(),
|
||||||
|
config.transcode_cleanup_ttl_hours,
|
||||||
|
config.base_url.clone(),
|
||||||
|
).await {
|
||||||
|
Ok(bundle) => {
|
||||||
|
let scan_idx = Arc::clone(&bundle.local_index);
|
||||||
|
tokio::spawn(async move { scan_idx.rescan().await; });
|
||||||
|
if let Some(ref tm) = bundle.transcode_manager {
|
||||||
|
tracing::info!("Transcoding enabled; cache dir: {:?}", config.transcode_dir);
|
||||||
|
let tm_clone = Arc::clone(tm);
|
||||||
|
let repo = build_transcode_settings_repository(db_pool).await.ok();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Some(r) = repo {
|
||||||
|
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
|
||||||
|
tm_clone.set_cleanup_ttl(ttl);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
registry.register("local", bundle.provider);
|
||||||
|
transcode_manager = bundle.transcode_manager;
|
||||||
|
local_index = Some(bundle.local_index);
|
||||||
|
}
|
||||||
|
Err(e) => tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if registry.is_empty() {
|
||||||
|
tracing::warn!("No media provider configured. Set JELLYFIN_BASE_URL / LOCAL_FILES_DIR.");
|
||||||
|
registry.register("noop", Arc::new(NoopMediaProvider));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(ProviderBundle {
|
||||||
|
registry: Arc::new(registry),
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
local_index,
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
transcode_manager,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stand-in provider used when no real media source is configured.
|
||||||
|
/// Returns a descriptive error for every call so schedule endpoints fail
|
||||||
|
/// gracefully rather than panicking at startup.
|
||||||
|
struct NoopMediaProvider;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl IMediaProvider for NoopMediaProvider {
|
||||||
|
fn capabilities(&self) -> ProviderCapabilities {
|
||||||
|
ProviderCapabilities {
|
||||||
|
collections: false,
|
||||||
|
series: false,
|
||||||
|
genres: false,
|
||||||
|
tags: false,
|
||||||
|
decade: false,
|
||||||
|
search: false,
|
||||||
|
streaming_protocol: StreamingProtocol::DirectFile,
|
||||||
|
rescan: false,
|
||||||
|
transcode: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_items(
|
||||||
|
&self,
|
||||||
|
_: &domain::MediaFilter,
|
||||||
|
) -> domain::DomainResult<Vec<domain::MediaItem>> {
|
||||||
|
Err(DomainError::InfrastructureError(
|
||||||
|
"No media provider configured. Set JELLYFIN_BASE_URL or LOCAL_FILES_DIR.".into(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_by_id(
|
||||||
|
&self,
|
||||||
|
_: &domain::MediaItemId,
|
||||||
|
) -> domain::DomainResult<Option<domain::MediaItem>> {
|
||||||
|
Err(DomainError::InfrastructureError(
|
||||||
|
"No media provider configured.".into(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_stream_url(
|
||||||
|
&self,
|
||||||
|
_: &domain::MediaItemId,
|
||||||
|
_: &StreamQuality,
|
||||||
|
) -> domain::DomainResult<String> {
|
||||||
|
Err(DomainError::InfrastructureError(
|
||||||
|
"No media provider configured.".into(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
59
k-tv-backend/api/src/server.rs
Normal file
59
k-tv-backend/api/src/server.rs
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
use axum::Router;
|
||||||
|
use axum::http::{HeaderName, HeaderValue};
|
||||||
|
use k_core::http::server::{ServerConfig, apply_standard_middleware};
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
|
||||||
|
|
||||||
|
use crate::config::Config;
|
||||||
|
use crate::routes;
|
||||||
|
use crate::state::AppState;
|
||||||
|
|
||||||
|
pub async fn build_and_serve(state: AppState, config: &Config) -> anyhow::Result<()> {
|
||||||
|
let server_config = ServerConfig {
|
||||||
|
cors_origins: config.cors_allowed_origins.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let app = Router::new()
|
||||||
|
.nest("/api/v1", routes::api_v1_router())
|
||||||
|
.with_state(state);
|
||||||
|
|
||||||
|
let app = apply_standard_middleware(app, &server_config);
|
||||||
|
|
||||||
|
// Wrap with an outer CorsLayer that includes the custom password headers.
|
||||||
|
// Being outermost it handles OPTIONS preflights before k_core's inner layer.
|
||||||
|
let origins: Vec<HeaderValue> = config
|
||||||
|
.cors_allowed_origins
|
||||||
|
.iter()
|
||||||
|
.filter_map(|o| o.parse().ok())
|
||||||
|
.collect();
|
||||||
|
let cors = CorsLayer::new()
|
||||||
|
.allow_origin(AllowOrigin::list(origins))
|
||||||
|
.allow_methods(AllowMethods::any())
|
||||||
|
.allow_headers(AllowHeaders::list([
|
||||||
|
axum::http::header::AUTHORIZATION,
|
||||||
|
axum::http::header::CONTENT_TYPE,
|
||||||
|
HeaderName::from_static("x-channel-password"),
|
||||||
|
HeaderName::from_static("x-block-password"),
|
||||||
|
]));
|
||||||
|
let app = app.layer(cors);
|
||||||
|
|
||||||
|
let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?;
|
||||||
|
let listener = TcpListener::bind(addr).await?;
|
||||||
|
|
||||||
|
tracing::info!("🚀 API server running at http://{}", addr);
|
||||||
|
tracing::info!("🔒 Authentication mode: JWT (Bearer token)");
|
||||||
|
|
||||||
|
#[cfg(feature = "auth-jwt")]
|
||||||
|
tracing::info!(" ✓ JWT auth enabled");
|
||||||
|
|
||||||
|
#[cfg(feature = "auth-oidc")]
|
||||||
|
tracing::info!(" ✓ OIDC integration enabled (stateless cookie state)");
|
||||||
|
|
||||||
|
tracing::info!("📝 API endpoints available at /api/v1/...");
|
||||||
|
|
||||||
|
axum::serve(listener, app).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
24
k-tv-backend/api/src/startup.rs
Normal file
24
k-tv-backend/api/src/startup.rs
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use domain::{ChannelRepository, DomainEvent, ScheduleEngineService};
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
|
use crate::{poller, scheduler};
|
||||||
|
|
||||||
|
pub fn spawn_background_tasks(
|
||||||
|
schedule_engine: Arc<ScheduleEngineService>,
|
||||||
|
channel_repo: Arc<dyn ChannelRepository>,
|
||||||
|
event_tx: broadcast::Sender<DomainEvent>,
|
||||||
|
) {
|
||||||
|
let bg_channel_repo = channel_repo.clone();
|
||||||
|
tokio::spawn(scheduler::run_auto_scheduler(
|
||||||
|
Arc::clone(&schedule_engine),
|
||||||
|
bg_channel_repo,
|
||||||
|
event_tx.clone(),
|
||||||
|
));
|
||||||
|
tokio::spawn(poller::run_broadcast_poller(
|
||||||
|
schedule_engine,
|
||||||
|
channel_repo,
|
||||||
|
event_tx,
|
||||||
|
));
|
||||||
|
}
|
||||||
25
k-tv-backend/api/src/telemetry.rs
Normal file
25
k-tv-backend/api/src/telemetry.rs
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
|
||||||
|
|
||||||
|
use crate::log_layer::{AppLogLayer, LogLine};
|
||||||
|
|
||||||
|
pub struct LoggingHandles {
|
||||||
|
pub log_tx: broadcast::Sender<LogLine>,
|
||||||
|
pub log_history: Arc<Mutex<VecDeque<LogLine>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn init_tracing() -> LoggingHandles {
|
||||||
|
let (log_tx, _) = broadcast::channel::<LogLine>(512);
|
||||||
|
let log_history = Arc::new(Mutex::new(VecDeque::<LogLine>::new()));
|
||||||
|
|
||||||
|
tracing_subscriber::registry()
|
||||||
|
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
|
||||||
|
.with(fmt::layer())
|
||||||
|
.with(AppLogLayer::new(log_tx.clone(), Arc::clone(&log_history)))
|
||||||
|
.init();
|
||||||
|
|
||||||
|
LoggingHandles { log_tx, log_history }
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user