Compare commits
6 Commits
9d792249c9
...
7244349e97
| Author | SHA1 | Date | |
|---|---|---|---|
| 7244349e97 | |||
| 6aa86b6666 | |||
| e7bd66ffdf | |||
| b25ae95626 | |||
| 5949ffc63b | |||
| 29e654cabc |
36
k-tv-backend/api/src/database.rs
Normal file
36
k-tv-backend/api/src/database.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration as StdDuration;
|
||||
|
||||
use crate::config::Config;
|
||||
use infra::run_migrations;
|
||||
use k_core::db::DatabasePool;
|
||||
|
||||
pub async fn init_database(config: &Config) -> anyhow::Result<Arc<DatabasePool>> {
|
||||
tracing::info!("Connecting to database: {}", config.database_url);
|
||||
|
||||
#[cfg(all(feature = "sqlite", not(feature = "postgres")))]
|
||||
let db_type = k_core::db::DbType::Sqlite;
|
||||
|
||||
#[cfg(all(feature = "postgres", not(feature = "sqlite")))]
|
||||
let db_type = k_core::db::DbType::Postgres;
|
||||
|
||||
// Both features enabled: fall back to URL inspection at runtime
|
||||
#[cfg(all(feature = "sqlite", feature = "postgres"))]
|
||||
let db_type = if config.database_url.starts_with("postgres") {
|
||||
k_core::db::DbType::Postgres
|
||||
} else {
|
||||
k_core::db::DbType::Sqlite
|
||||
};
|
||||
|
||||
let db_config = k_core::db::DatabaseConfig {
|
||||
db_type,
|
||||
url: config.database_url.clone(),
|
||||
max_connections: config.db_max_connections,
|
||||
min_connections: config.db_min_connections,
|
||||
acquire_timeout: StdDuration::from_secs(30),
|
||||
};
|
||||
|
||||
let pool = k_core::db::connect(&db_config).await?;
|
||||
run_migrations(&pool).await?;
|
||||
Ok(Arc::new(pool))
|
||||
}
|
||||
@@ -2,27 +2,18 @@
|
||||
//!
|
||||
//! Configures and starts the HTTP server with JWT-based authentication.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration as StdDuration;
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::Router;
|
||||
use axum::http::{HeaderName, HeaderValue};
|
||||
use tokio::sync::broadcast;
|
||||
use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
|
||||
use tracing::info;
|
||||
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
use domain::{ChannelService, IMediaProvider, IProviderRegistry, ProviderCapabilities, ScheduleEngineService, StreamingProtocol, UserService};
|
||||
use domain::{ChannelService, IProviderRegistry, ScheduleEngineService, UserService};
|
||||
use infra::factory::{build_activity_log_repository, build_channel_repository, build_provider_config_repository, build_schedule_repository, build_user_repository};
|
||||
#[cfg(feature = "local-files")]
|
||||
use infra::factory::build_transcode_settings_repository;
|
||||
use infra::run_migrations;
|
||||
use k_core::http::server::{ServerConfig, apply_standard_middleware};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
mod config;
|
||||
mod database;
|
||||
mod provider_registry;
|
||||
mod dto;
|
||||
mod error;
|
||||
mod events;
|
||||
@@ -31,57 +22,25 @@ mod log_layer;
|
||||
mod poller;
|
||||
mod routes;
|
||||
mod scheduler;
|
||||
mod server;
|
||||
mod startup;
|
||||
mod state;
|
||||
mod telemetry;
|
||||
mod webhook;
|
||||
|
||||
use crate::config::{Config, ConfigSource};
|
||||
use crate::config::Config;
|
||||
use crate::state::AppState;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// Set up broadcast channel + ring buffer for SSE log streaming.
|
||||
let (log_tx, _) = broadcast::channel::<log_layer::LogLine>(512);
|
||||
let log_history = Arc::new(Mutex::new(VecDeque::<log_layer::LogLine>::new()));
|
||||
|
||||
// Initialize tracing with our custom layer in addition to the fmt layer.
|
||||
tracing_subscriber::registry()
|
||||
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
|
||||
.with(fmt::layer())
|
||||
.with(log_layer::AppLogLayer::new(log_tx.clone(), Arc::clone(&log_history)))
|
||||
.init();
|
||||
let handles = telemetry::init_tracing();
|
||||
|
||||
let config = Config::from_env();
|
||||
|
||||
info!("Starting server on {}:{}", config.host, config.port);
|
||||
|
||||
// Setup database
|
||||
tracing::info!("Connecting to database: {}", config.database_url);
|
||||
|
||||
#[cfg(all(feature = "sqlite", not(feature = "postgres")))]
|
||||
let db_type = k_core::db::DbType::Sqlite;
|
||||
|
||||
#[cfg(all(feature = "postgres", not(feature = "sqlite")))]
|
||||
let db_type = k_core::db::DbType::Postgres;
|
||||
|
||||
// Both features enabled: fall back to URL inspection at runtime
|
||||
#[cfg(all(feature = "sqlite", feature = "postgres"))]
|
||||
let db_type = if config.database_url.starts_with("postgres") {
|
||||
k_core::db::DbType::Postgres
|
||||
} else {
|
||||
k_core::db::DbType::Sqlite
|
||||
};
|
||||
|
||||
let db_config = k_core::db::DatabaseConfig {
|
||||
db_type,
|
||||
url: config.database_url.clone(),
|
||||
max_connections: config.db_max_connections,
|
||||
min_connections: config.db_min_connections,
|
||||
acquire_timeout: StdDuration::from_secs(30),
|
||||
};
|
||||
|
||||
let db_pool = k_core::db::connect(&db_config).await?;
|
||||
run_migrations(&db_pool).await?;
|
||||
let db_pool = Arc::new(db_pool);
|
||||
let db_pool = database::init_database(&config).await?;
|
||||
|
||||
let user_repo = build_user_repository(&db_pool).await?;
|
||||
let channel_repo = build_channel_repository(&db_pool).await?;
|
||||
@@ -92,129 +51,13 @@ async fn main() -> anyhow::Result<()> {
|
||||
let channel_service = ChannelService::new(channel_repo.clone());
|
||||
|
||||
// Build provider registry — all configured providers are registered simultaneously.
|
||||
#[cfg(feature = "local-files")]
|
||||
let mut local_index: Option<Arc<infra::LocalIndex>> = None;
|
||||
#[cfg(feature = "local-files")]
|
||||
let mut transcode_manager: Option<Arc<infra::TranscodeManager>> = None;
|
||||
|
||||
let mut registry = infra::ProviderRegistry::new();
|
||||
|
||||
let provider_config_repo = build_provider_config_repository(&db_pool).await?;
|
||||
|
||||
if config.config_source == ConfigSource::Db {
|
||||
tracing::info!("CONFIG_SOURCE=db: loading provider configs from database");
|
||||
let rows = provider_config_repo.get_all().await?;
|
||||
for row in &rows {
|
||||
if !row.enabled { continue; }
|
||||
match row.provider_type.as_str() {
|
||||
#[cfg(feature = "jellyfin")]
|
||||
"jellyfin" => {
|
||||
if let Ok(cfg) = serde_json::from_str::<infra::JellyfinConfig>(&row.config_json) {
|
||||
tracing::info!("Loading Jellyfin provider from DB config");
|
||||
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(cfg)));
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "local-files")]
|
||||
"local_files" => {
|
||||
if let Ok(cfg_map) = serde_json::from_str::<std::collections::HashMap<String, String>>(&row.config_json) {
|
||||
if let Some(files_dir) = cfg_map.get("files_dir") {
|
||||
let transcode_dir = cfg_map.get("transcode_dir")
|
||||
.filter(|s| !s.is_empty())
|
||||
.map(std::path::PathBuf::from);
|
||||
let cleanup_ttl_hours: u32 = cfg_map.get("cleanup_ttl_hours")
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(24);
|
||||
tracing::info!("Loading local-files provider from DB config at {:?}", files_dir);
|
||||
match infra::factory::build_local_files_bundle(
|
||||
&db_pool,
|
||||
std::path::PathBuf::from(files_dir),
|
||||
transcode_dir,
|
||||
cleanup_ttl_hours,
|
||||
config.base_url.clone(),
|
||||
).await {
|
||||
Ok(bundle) => {
|
||||
let scan_idx = Arc::clone(&bundle.local_index);
|
||||
tokio::spawn(async move { scan_idx.rescan().await; });
|
||||
if let Some(ref tm) = bundle.transcode_manager {
|
||||
tracing::info!("Transcoding enabled");
|
||||
// Load persisted TTL override from transcode_settings table.
|
||||
let tm_clone = Arc::clone(tm);
|
||||
let repo = build_transcode_settings_repository(&db_pool).await.ok();
|
||||
tokio::spawn(async move {
|
||||
if let Some(r) = repo {
|
||||
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
|
||||
tm_clone.set_cleanup_ttl(ttl);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
registry.register("local", bundle.provider);
|
||||
transcode_manager = bundle.transcode_manager;
|
||||
local_index = Some(bundle.local_index);
|
||||
}
|
||||
Err(e) => tracing::warn!("Failed to build local-files provider: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
#[cfg(feature = "jellyfin")]
|
||||
if let (Some(base_url), Some(api_key), Some(user_id)) = (
|
||||
&config.jellyfin_base_url,
|
||||
&config.jellyfin_api_key,
|
||||
&config.jellyfin_user_id,
|
||||
) {
|
||||
tracing::info!("Media provider: Jellyfin at {}", base_url);
|
||||
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(infra::JellyfinConfig {
|
||||
base_url: base_url.clone(),
|
||||
api_key: api_key.clone(),
|
||||
user_id: user_id.clone(),
|
||||
})));
|
||||
}
|
||||
let bundle = provider_registry::build_provider_registry(
|
||||
&config, &db_pool, &provider_config_repo,
|
||||
).await?;
|
||||
|
||||
#[cfg(feature = "local-files")]
|
||||
if let Some(dir) = &config.local_files_dir {
|
||||
tracing::info!("Media provider: local files at {:?}", dir);
|
||||
match infra::factory::build_local_files_bundle(
|
||||
&db_pool,
|
||||
dir.clone(),
|
||||
config.transcode_dir.clone(),
|
||||
config.transcode_cleanup_ttl_hours,
|
||||
config.base_url.clone(),
|
||||
).await {
|
||||
Ok(bundle) => {
|
||||
let scan_idx = Arc::clone(&bundle.local_index);
|
||||
tokio::spawn(async move { scan_idx.rescan().await; });
|
||||
if let Some(ref tm) = bundle.transcode_manager {
|
||||
tracing::info!("Transcoding enabled; cache dir: {:?}", config.transcode_dir);
|
||||
let tm_clone = Arc::clone(tm);
|
||||
let repo = build_transcode_settings_repository(&db_pool).await.ok();
|
||||
tokio::spawn(async move {
|
||||
if let Some(r) = repo {
|
||||
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
|
||||
tm_clone.set_cleanup_ttl(ttl);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
registry.register("local", bundle.provider);
|
||||
transcode_manager = bundle.transcode_manager;
|
||||
local_index = Some(bundle.local_index);
|
||||
}
|
||||
Err(e) => tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if registry.is_empty() {
|
||||
tracing::warn!("No media provider configured. Set JELLYFIN_BASE_URL / LOCAL_FILES_DIR.");
|
||||
registry.register("noop", Arc::new(NoopMediaProvider));
|
||||
}
|
||||
|
||||
let registry_arc = Arc::new(registry);
|
||||
let registry_arc = bundle.registry;
|
||||
let provider_registry: Arc<tokio::sync::RwLock<Arc<infra::ProviderRegistry>>> =
|
||||
Arc::new(tokio::sync::RwLock::new(Arc::clone(®istry_arc)));
|
||||
|
||||
@@ -246,8 +89,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
provider_config_repo,
|
||||
config.clone(),
|
||||
event_tx.clone(),
|
||||
log_tx,
|
||||
log_history,
|
||||
handles.log_tx,
|
||||
handles.log_history,
|
||||
activity_log_repo,
|
||||
db_pool,
|
||||
#[cfg(feature = "local-files")]
|
||||
@@ -256,118 +99,19 @@ async fn main() -> anyhow::Result<()> {
|
||||
.await?;
|
||||
|
||||
#[cfg(feature = "local-files")]
|
||||
if let Some(idx) = local_index {
|
||||
if let Some(idx) = bundle.local_index {
|
||||
*state.local_index.write().await = Some(idx);
|
||||
}
|
||||
#[cfg(feature = "local-files")]
|
||||
if let Some(tm) = transcode_manager {
|
||||
if let Some(tm) = bundle.transcode_manager {
|
||||
*state.transcode_manager.write().await = Some(tm);
|
||||
}
|
||||
|
||||
let server_config = ServerConfig {
|
||||
cors_origins: config.cors_allowed_origins.clone(),
|
||||
};
|
||||
|
||||
let bg_channel_repo_poller = bg_channel_repo.clone();
|
||||
let bg_schedule_engine = Arc::clone(&state.schedule_engine);
|
||||
tokio::spawn(scheduler::run_auto_scheduler(bg_schedule_engine, bg_channel_repo, event_tx.clone()));
|
||||
|
||||
let bg_schedule_engine_poller = Arc::clone(&state.schedule_engine);
|
||||
tokio::spawn(poller::run_broadcast_poller(
|
||||
bg_schedule_engine_poller,
|
||||
bg_channel_repo_poller,
|
||||
startup::spawn_background_tasks(
|
||||
Arc::clone(&state.schedule_engine),
|
||||
bg_channel_repo,
|
||||
event_tx,
|
||||
));
|
||||
);
|
||||
|
||||
let app = Router::new()
|
||||
.nest("/api/v1", routes::api_v1_router())
|
||||
.with_state(state);
|
||||
|
||||
let app = apply_standard_middleware(app, &server_config);
|
||||
|
||||
// Wrap with an outer CorsLayer that includes the custom password headers.
|
||||
// Being outermost it handles OPTIONS preflights before k_core's inner layer.
|
||||
let origins: Vec<HeaderValue> = config
|
||||
.cors_allowed_origins
|
||||
.iter()
|
||||
.filter_map(|o| o.parse().ok())
|
||||
.collect();
|
||||
let cors = CorsLayer::new()
|
||||
.allow_origin(AllowOrigin::list(origins))
|
||||
.allow_methods(AllowMethods::any())
|
||||
.allow_headers(AllowHeaders::list([
|
||||
axum::http::header::AUTHORIZATION,
|
||||
axum::http::header::CONTENT_TYPE,
|
||||
HeaderName::from_static("x-channel-password"),
|
||||
HeaderName::from_static("x-block-password"),
|
||||
]));
|
||||
let app = app.layer(cors);
|
||||
|
||||
let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?;
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
|
||||
tracing::info!("🚀 API server running at http://{}", addr);
|
||||
tracing::info!("🔒 Authentication mode: JWT (Bearer token)");
|
||||
|
||||
#[cfg(feature = "auth-jwt")]
|
||||
tracing::info!(" ✓ JWT auth enabled");
|
||||
|
||||
#[cfg(feature = "auth-oidc")]
|
||||
tracing::info!(" ✓ OIDC integration enabled (stateless cookie state)");
|
||||
|
||||
tracing::info!("📝 API endpoints available at /api/v1/...");
|
||||
|
||||
axum::serve(listener, app).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stand-in provider used when no real media source is configured.
|
||||
/// Returns a descriptive error for every call so schedule endpoints fail
|
||||
/// gracefully rather than panicking at startup.
|
||||
struct NoopMediaProvider;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl IMediaProvider for NoopMediaProvider {
|
||||
fn capabilities(&self) -> ProviderCapabilities {
|
||||
ProviderCapabilities {
|
||||
collections: false,
|
||||
series: false,
|
||||
genres: false,
|
||||
tags: false,
|
||||
decade: false,
|
||||
search: false,
|
||||
streaming_protocol: StreamingProtocol::DirectFile,
|
||||
rescan: false,
|
||||
transcode: false,
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_items(
|
||||
&self,
|
||||
_: &domain::MediaFilter,
|
||||
) -> domain::DomainResult<Vec<domain::MediaItem>> {
|
||||
Err(domain::DomainError::InfrastructureError(
|
||||
"No media provider configured. Set JELLYFIN_BASE_URL or LOCAL_FILES_DIR.".into(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn fetch_by_id(
|
||||
&self,
|
||||
_: &domain::MediaItemId,
|
||||
) -> domain::DomainResult<Option<domain::MediaItem>> {
|
||||
Err(domain::DomainError::InfrastructureError(
|
||||
"No media provider configured.".into(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn get_stream_url(
|
||||
&self,
|
||||
_: &domain::MediaItemId,
|
||||
_: &domain::StreamQuality,
|
||||
) -> domain::DomainResult<String> {
|
||||
Err(domain::DomainError::InfrastructureError(
|
||||
"No media provider configured.".into(),
|
||||
))
|
||||
}
|
||||
server::build_and_serve(state, &config).await
|
||||
}
|
||||
|
||||
205
k-tv-backend/api/src/provider_registry.rs
Normal file
205
k-tv-backend/api/src/provider_registry.rs
Normal file
@@ -0,0 +1,205 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use domain::{
|
||||
DomainError, IMediaProvider, ProviderCapabilities, ProviderConfigRepository,
|
||||
StreamingProtocol, StreamQuality,
|
||||
};
|
||||
use k_core::db::DatabasePool;
|
||||
|
||||
use crate::config::{Config, ConfigSource};
|
||||
|
||||
#[cfg(feature = "local-files")]
|
||||
use infra::factory::build_transcode_settings_repository;
|
||||
|
||||
pub struct ProviderBundle {
|
||||
pub registry: Arc<infra::ProviderRegistry>,
|
||||
#[cfg(feature = "local-files")]
|
||||
pub local_index: Option<Arc<infra::LocalIndex>>,
|
||||
#[cfg(feature = "local-files")]
|
||||
pub transcode_manager: Option<Arc<infra::TranscodeManager>>,
|
||||
}
|
||||
|
||||
pub async fn build_provider_registry(
|
||||
config: &Config,
|
||||
#[cfg_attr(not(feature = "local-files"), allow(unused_variables))]
|
||||
db_pool: &Arc<DatabasePool>,
|
||||
provider_config_repo: &Arc<dyn ProviderConfigRepository>,
|
||||
) -> anyhow::Result<ProviderBundle> {
|
||||
#[cfg(feature = "local-files")]
|
||||
let mut local_index: Option<Arc<infra::LocalIndex>> = None;
|
||||
#[cfg(feature = "local-files")]
|
||||
let mut transcode_manager: Option<Arc<infra::TranscodeManager>> = None;
|
||||
|
||||
let mut registry = infra::ProviderRegistry::new();
|
||||
|
||||
if config.config_source == ConfigSource::Db {
|
||||
tracing::info!("CONFIG_SOURCE=db: loading provider configs from database");
|
||||
let rows = provider_config_repo.get_all().await?;
|
||||
for row in &rows {
|
||||
if !row.enabled { continue; }
|
||||
match row.provider_type.as_str() {
|
||||
#[cfg(feature = "jellyfin")]
|
||||
"jellyfin" => {
|
||||
if let Ok(cfg) = serde_json::from_str::<infra::JellyfinConfig>(&row.config_json) {
|
||||
tracing::info!("Loading Jellyfin provider from DB config");
|
||||
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(cfg)));
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "local-files")]
|
||||
"local_files" => {
|
||||
if let Ok(cfg_map) = serde_json::from_str::<std::collections::HashMap<String, String>>(&row.config_json) {
|
||||
if let Some(files_dir) = cfg_map.get("files_dir") {
|
||||
let transcode_dir = cfg_map.get("transcode_dir")
|
||||
.filter(|s| !s.is_empty())
|
||||
.map(std::path::PathBuf::from);
|
||||
let cleanup_ttl_hours: u32 = cfg_map.get("cleanup_ttl_hours")
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(24);
|
||||
tracing::info!("Loading local-files provider from DB config at {:?}", files_dir);
|
||||
match infra::factory::build_local_files_bundle(
|
||||
db_pool,
|
||||
std::path::PathBuf::from(files_dir),
|
||||
transcode_dir,
|
||||
cleanup_ttl_hours,
|
||||
config.base_url.clone(),
|
||||
).await {
|
||||
Ok(bundle) => {
|
||||
let scan_idx = Arc::clone(&bundle.local_index);
|
||||
tokio::spawn(async move { scan_idx.rescan().await; });
|
||||
if let Some(ref tm) = bundle.transcode_manager {
|
||||
tracing::info!("Transcoding enabled");
|
||||
// Load persisted TTL override from transcode_settings table.
|
||||
let tm_clone = Arc::clone(tm);
|
||||
let repo = build_transcode_settings_repository(db_pool).await.ok();
|
||||
tokio::spawn(async move {
|
||||
if let Some(r) = repo {
|
||||
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
|
||||
tm_clone.set_cleanup_ttl(ttl);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
registry.register("local", bundle.provider);
|
||||
transcode_manager = bundle.transcode_manager;
|
||||
local_index = Some(bundle.local_index);
|
||||
}
|
||||
Err(e) => tracing::warn!("Failed to build local-files provider: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
#[cfg(feature = "jellyfin")]
|
||||
if let (Some(base_url), Some(api_key), Some(user_id)) = (
|
||||
&config.jellyfin_base_url,
|
||||
&config.jellyfin_api_key,
|
||||
&config.jellyfin_user_id,
|
||||
) {
|
||||
tracing::info!("Media provider: Jellyfin at {}", base_url);
|
||||
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(infra::JellyfinConfig {
|
||||
base_url: base_url.clone(),
|
||||
api_key: api_key.clone(),
|
||||
user_id: user_id.clone(),
|
||||
})));
|
||||
}
|
||||
|
||||
#[cfg(feature = "local-files")]
|
||||
if let Some(dir) = &config.local_files_dir {
|
||||
tracing::info!("Media provider: local files at {:?}", dir);
|
||||
match infra::factory::build_local_files_bundle(
|
||||
db_pool,
|
||||
dir.clone(),
|
||||
config.transcode_dir.clone(),
|
||||
config.transcode_cleanup_ttl_hours,
|
||||
config.base_url.clone(),
|
||||
).await {
|
||||
Ok(bundle) => {
|
||||
let scan_idx = Arc::clone(&bundle.local_index);
|
||||
tokio::spawn(async move { scan_idx.rescan().await; });
|
||||
if let Some(ref tm) = bundle.transcode_manager {
|
||||
tracing::info!("Transcoding enabled; cache dir: {:?}", config.transcode_dir);
|
||||
let tm_clone = Arc::clone(tm);
|
||||
let repo = build_transcode_settings_repository(db_pool).await.ok();
|
||||
tokio::spawn(async move {
|
||||
if let Some(r) = repo {
|
||||
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
|
||||
tm_clone.set_cleanup_ttl(ttl);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
registry.register("local", bundle.provider);
|
||||
transcode_manager = bundle.transcode_manager;
|
||||
local_index = Some(bundle.local_index);
|
||||
}
|
||||
Err(e) => tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if registry.is_empty() {
|
||||
tracing::warn!("No media provider configured. Set JELLYFIN_BASE_URL / LOCAL_FILES_DIR.");
|
||||
registry.register("noop", Arc::new(NoopMediaProvider));
|
||||
}
|
||||
|
||||
Ok(ProviderBundle {
|
||||
registry: Arc::new(registry),
|
||||
#[cfg(feature = "local-files")]
|
||||
local_index,
|
||||
#[cfg(feature = "local-files")]
|
||||
transcode_manager,
|
||||
})
|
||||
}
|
||||
|
||||
/// Stand-in provider used when no real media source is configured.
|
||||
/// Returns a descriptive error for every call so schedule endpoints fail
|
||||
/// gracefully rather than panicking at startup.
|
||||
struct NoopMediaProvider;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl IMediaProvider for NoopMediaProvider {
|
||||
fn capabilities(&self) -> ProviderCapabilities {
|
||||
ProviderCapabilities {
|
||||
collections: false,
|
||||
series: false,
|
||||
genres: false,
|
||||
tags: false,
|
||||
decade: false,
|
||||
search: false,
|
||||
streaming_protocol: StreamingProtocol::DirectFile,
|
||||
rescan: false,
|
||||
transcode: false,
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_items(
|
||||
&self,
|
||||
_: &domain::MediaFilter,
|
||||
) -> domain::DomainResult<Vec<domain::MediaItem>> {
|
||||
Err(DomainError::InfrastructureError(
|
||||
"No media provider configured. Set JELLYFIN_BASE_URL or LOCAL_FILES_DIR.".into(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn fetch_by_id(
|
||||
&self,
|
||||
_: &domain::MediaItemId,
|
||||
) -> domain::DomainResult<Option<domain::MediaItem>> {
|
||||
Err(DomainError::InfrastructureError(
|
||||
"No media provider configured.".into(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn get_stream_url(
|
||||
&self,
|
||||
_: &domain::MediaItemId,
|
||||
_: &StreamQuality,
|
||||
) -> domain::DomainResult<String> {
|
||||
Err(DomainError::InfrastructureError(
|
||||
"No media provider configured.".into(),
|
||||
))
|
||||
}
|
||||
}
|
||||
59
k-tv-backend/api/src/server.rs
Normal file
59
k-tv-backend/api/src/server.rs
Normal file
@@ -0,0 +1,59 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use axum::Router;
|
||||
use axum::http::{HeaderName, HeaderValue};
|
||||
use k_core::http::server::{ServerConfig, apply_standard_middleware};
|
||||
use tokio::net::TcpListener;
|
||||
use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::routes;
|
||||
use crate::state::AppState;
|
||||
|
||||
pub async fn build_and_serve(state: AppState, config: &Config) -> anyhow::Result<()> {
|
||||
let server_config = ServerConfig {
|
||||
cors_origins: config.cors_allowed_origins.clone(),
|
||||
};
|
||||
|
||||
let app = Router::new()
|
||||
.nest("/api/v1", routes::api_v1_router())
|
||||
.with_state(state);
|
||||
|
||||
let app = apply_standard_middleware(app, &server_config);
|
||||
|
||||
// Wrap with an outer CorsLayer that includes the custom password headers.
|
||||
// Being outermost it handles OPTIONS preflights before k_core's inner layer.
|
||||
let origins: Vec<HeaderValue> = config
|
||||
.cors_allowed_origins
|
||||
.iter()
|
||||
.filter_map(|o| o.parse().ok())
|
||||
.collect();
|
||||
let cors = CorsLayer::new()
|
||||
.allow_origin(AllowOrigin::list(origins))
|
||||
.allow_methods(AllowMethods::any())
|
||||
.allow_headers(AllowHeaders::list([
|
||||
axum::http::header::AUTHORIZATION,
|
||||
axum::http::header::CONTENT_TYPE,
|
||||
HeaderName::from_static("x-channel-password"),
|
||||
HeaderName::from_static("x-block-password"),
|
||||
]));
|
||||
let app = app.layer(cors);
|
||||
|
||||
let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?;
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
|
||||
tracing::info!("🚀 API server running at http://{}", addr);
|
||||
tracing::info!("🔒 Authentication mode: JWT (Bearer token)");
|
||||
|
||||
#[cfg(feature = "auth-jwt")]
|
||||
tracing::info!(" ✓ JWT auth enabled");
|
||||
|
||||
#[cfg(feature = "auth-oidc")]
|
||||
tracing::info!(" ✓ OIDC integration enabled (stateless cookie state)");
|
||||
|
||||
tracing::info!("📝 API endpoints available at /api/v1/...");
|
||||
|
||||
axum::serve(listener, app).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
24
k-tv-backend/api/src/startup.rs
Normal file
24
k-tv-backend/api/src/startup.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use domain::{ChannelRepository, DomainEvent, ScheduleEngineService};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::{poller, scheduler};
|
||||
|
||||
pub fn spawn_background_tasks(
|
||||
schedule_engine: Arc<ScheduleEngineService>,
|
||||
channel_repo: Arc<dyn ChannelRepository>,
|
||||
event_tx: broadcast::Sender<DomainEvent>,
|
||||
) {
|
||||
let bg_channel_repo = channel_repo.clone();
|
||||
tokio::spawn(scheduler::run_auto_scheduler(
|
||||
Arc::clone(&schedule_engine),
|
||||
bg_channel_repo,
|
||||
event_tx.clone(),
|
||||
));
|
||||
tokio::spawn(poller::run_broadcast_poller(
|
||||
schedule_engine,
|
||||
channel_repo,
|
||||
event_tx,
|
||||
));
|
||||
}
|
||||
25
k-tv-backend/api/src/telemetry.rs
Normal file
25
k-tv-backend/api/src/telemetry.rs
Normal file
@@ -0,0 +1,25 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use tokio::sync::broadcast;
|
||||
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
use crate::log_layer::{AppLogLayer, LogLine};
|
||||
|
||||
pub struct LoggingHandles {
|
||||
pub log_tx: broadcast::Sender<LogLine>,
|
||||
pub log_history: Arc<Mutex<VecDeque<LogLine>>>,
|
||||
}
|
||||
|
||||
pub fn init_tracing() -> LoggingHandles {
|
||||
let (log_tx, _) = broadcast::channel::<LogLine>(512);
|
||||
let log_history = Arc::new(Mutex::new(VecDeque::<LogLine>::new()));
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
|
||||
.with(fmt::layer())
|
||||
.with(AppLogLayer::new(log_tx.clone(), Arc::clone(&log_history)))
|
||||
.init();
|
||||
|
||||
LoggingHandles { log_tx, log_history }
|
||||
}
|
||||
Reference in New Issue
Block a user