363 lines
14 KiB
Rust
363 lines
14 KiB
Rust
//! API Server Entry Point
|
|
//!
|
|
//! Configures and starts the HTTP server with JWT-based authentication.
|
|
|
|
use std::net::SocketAddr;
|
|
use std::sync::Arc;
|
|
use std::time::Duration as StdDuration;
|
|
|
|
use axum::Router;
|
|
use axum::http::{HeaderName, HeaderValue};
|
|
use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
|
|
use tracing::info;
|
|
|
|
use domain::{ChannelService, IMediaProvider, IProviderRegistry, ProviderCapabilities, ScheduleEngineService, StreamingProtocol, UserService};
|
|
use infra::factory::{build_activity_log_repository, build_channel_repository, build_provider_config_repository, build_schedule_repository, build_user_repository};
|
|
#[cfg(feature = "local-files")]
|
|
use infra::factory::build_transcode_settings_repository;
|
|
use infra::run_migrations;
|
|
use k_core::http::server::{ServerConfig, apply_standard_middleware};
|
|
use tokio::net::TcpListener;
|
|
|
|
mod config;
|
|
mod dto;
|
|
mod error;
|
|
mod events;
|
|
mod extractors;
|
|
mod log_layer;
|
|
mod poller;
|
|
mod routes;
|
|
mod scheduler;
|
|
mod state;
|
|
mod telemetry;
|
|
mod webhook;
|
|
|
|
use crate::config::{Config, ConfigSource};
|
|
use crate::state::AppState;
|
|
|
|
#[tokio::main]
|
|
async fn main() -> anyhow::Result<()> {
|
|
let handles = telemetry::init_tracing();
|
|
|
|
let config = Config::from_env();
|
|
|
|
info!("Starting server on {}:{}", config.host, config.port);
|
|
|
|
// Setup database
|
|
tracing::info!("Connecting to database: {}", config.database_url);
|
|
|
|
#[cfg(all(feature = "sqlite", not(feature = "postgres")))]
|
|
let db_type = k_core::db::DbType::Sqlite;
|
|
|
|
#[cfg(all(feature = "postgres", not(feature = "sqlite")))]
|
|
let db_type = k_core::db::DbType::Postgres;
|
|
|
|
// Both features enabled: fall back to URL inspection at runtime
|
|
#[cfg(all(feature = "sqlite", feature = "postgres"))]
|
|
let db_type = if config.database_url.starts_with("postgres") {
|
|
k_core::db::DbType::Postgres
|
|
} else {
|
|
k_core::db::DbType::Sqlite
|
|
};
|
|
|
|
let db_config = k_core::db::DatabaseConfig {
|
|
db_type,
|
|
url: config.database_url.clone(),
|
|
max_connections: config.db_max_connections,
|
|
min_connections: config.db_min_connections,
|
|
acquire_timeout: StdDuration::from_secs(30),
|
|
};
|
|
|
|
let db_pool = k_core::db::connect(&db_config).await?;
|
|
run_migrations(&db_pool).await?;
|
|
let db_pool = Arc::new(db_pool);
|
|
|
|
let user_repo = build_user_repository(&db_pool).await?;
|
|
let channel_repo = build_channel_repository(&db_pool).await?;
|
|
let schedule_repo = build_schedule_repository(&db_pool).await?;
|
|
let activity_log_repo = build_activity_log_repository(&db_pool).await?;
|
|
|
|
let user_service = UserService::new(user_repo);
|
|
let channel_service = ChannelService::new(channel_repo.clone());
|
|
|
|
// Build provider registry — all configured providers are registered simultaneously.
|
|
#[cfg(feature = "local-files")]
|
|
let mut local_index: Option<Arc<infra::LocalIndex>> = None;
|
|
#[cfg(feature = "local-files")]
|
|
let mut transcode_manager: Option<Arc<infra::TranscodeManager>> = None;
|
|
|
|
let mut registry = infra::ProviderRegistry::new();
|
|
|
|
let provider_config_repo = build_provider_config_repository(&db_pool).await?;
|
|
|
|
if config.config_source == ConfigSource::Db {
|
|
tracing::info!("CONFIG_SOURCE=db: loading provider configs from database");
|
|
let rows = provider_config_repo.get_all().await?;
|
|
for row in &rows {
|
|
if !row.enabled { continue; }
|
|
match row.provider_type.as_str() {
|
|
#[cfg(feature = "jellyfin")]
|
|
"jellyfin" => {
|
|
if let Ok(cfg) = serde_json::from_str::<infra::JellyfinConfig>(&row.config_json) {
|
|
tracing::info!("Loading Jellyfin provider from DB config");
|
|
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(cfg)));
|
|
}
|
|
}
|
|
#[cfg(feature = "local-files")]
|
|
"local_files" => {
|
|
if let Ok(cfg_map) = serde_json::from_str::<std::collections::HashMap<String, String>>(&row.config_json) {
|
|
if let Some(files_dir) = cfg_map.get("files_dir") {
|
|
let transcode_dir = cfg_map.get("transcode_dir")
|
|
.filter(|s| !s.is_empty())
|
|
.map(std::path::PathBuf::from);
|
|
let cleanup_ttl_hours: u32 = cfg_map.get("cleanup_ttl_hours")
|
|
.and_then(|s| s.parse().ok())
|
|
.unwrap_or(24);
|
|
tracing::info!("Loading local-files provider from DB config at {:?}", files_dir);
|
|
match infra::factory::build_local_files_bundle(
|
|
&db_pool,
|
|
std::path::PathBuf::from(files_dir),
|
|
transcode_dir,
|
|
cleanup_ttl_hours,
|
|
config.base_url.clone(),
|
|
).await {
|
|
Ok(bundle) => {
|
|
let scan_idx = Arc::clone(&bundle.local_index);
|
|
tokio::spawn(async move { scan_idx.rescan().await; });
|
|
if let Some(ref tm) = bundle.transcode_manager {
|
|
tracing::info!("Transcoding enabled");
|
|
// Load persisted TTL override from transcode_settings table.
|
|
let tm_clone = Arc::clone(tm);
|
|
let repo = build_transcode_settings_repository(&db_pool).await.ok();
|
|
tokio::spawn(async move {
|
|
if let Some(r) = repo {
|
|
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
|
|
tm_clone.set_cleanup_ttl(ttl);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
registry.register("local", bundle.provider);
|
|
transcode_manager = bundle.transcode_manager;
|
|
local_index = Some(bundle.local_index);
|
|
}
|
|
Err(e) => tracing::warn!("Failed to build local-files provider: {}", e),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
} else {
|
|
#[cfg(feature = "jellyfin")]
|
|
if let (Some(base_url), Some(api_key), Some(user_id)) = (
|
|
&config.jellyfin_base_url,
|
|
&config.jellyfin_api_key,
|
|
&config.jellyfin_user_id,
|
|
) {
|
|
tracing::info!("Media provider: Jellyfin at {}", base_url);
|
|
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(infra::JellyfinConfig {
|
|
base_url: base_url.clone(),
|
|
api_key: api_key.clone(),
|
|
user_id: user_id.clone(),
|
|
})));
|
|
}
|
|
|
|
#[cfg(feature = "local-files")]
|
|
if let Some(dir) = &config.local_files_dir {
|
|
tracing::info!("Media provider: local files at {:?}", dir);
|
|
match infra::factory::build_local_files_bundle(
|
|
&db_pool,
|
|
dir.clone(),
|
|
config.transcode_dir.clone(),
|
|
config.transcode_cleanup_ttl_hours,
|
|
config.base_url.clone(),
|
|
).await {
|
|
Ok(bundle) => {
|
|
let scan_idx = Arc::clone(&bundle.local_index);
|
|
tokio::spawn(async move { scan_idx.rescan().await; });
|
|
if let Some(ref tm) = bundle.transcode_manager {
|
|
tracing::info!("Transcoding enabled; cache dir: {:?}", config.transcode_dir);
|
|
let tm_clone = Arc::clone(tm);
|
|
let repo = build_transcode_settings_repository(&db_pool).await.ok();
|
|
tokio::spawn(async move {
|
|
if let Some(r) = repo {
|
|
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
|
|
tm_clone.set_cleanup_ttl(ttl);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
registry.register("local", bundle.provider);
|
|
transcode_manager = bundle.transcode_manager;
|
|
local_index = Some(bundle.local_index);
|
|
}
|
|
Err(e) => tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR: {}", e),
|
|
}
|
|
}
|
|
}
|
|
|
|
if registry.is_empty() {
|
|
tracing::warn!("No media provider configured. Set JELLYFIN_BASE_URL / LOCAL_FILES_DIR.");
|
|
registry.register("noop", Arc::new(NoopMediaProvider));
|
|
}
|
|
|
|
let registry_arc = Arc::new(registry);
|
|
let provider_registry: Arc<tokio::sync::RwLock<Arc<infra::ProviderRegistry>>> =
|
|
Arc::new(tokio::sync::RwLock::new(Arc::clone(®istry_arc)));
|
|
|
|
let (event_tx, event_rx) = tokio::sync::broadcast::channel::<domain::DomainEvent>(64);
|
|
|
|
let bg_channel_repo = channel_repo.clone();
|
|
let webhook_channel_repo = channel_repo.clone();
|
|
tokio::spawn(webhook::run_webhook_consumer(
|
|
event_rx,
|
|
webhook_channel_repo,
|
|
reqwest::Client::new(),
|
|
));
|
|
|
|
let schedule_engine = ScheduleEngineService::new(
|
|
Arc::clone(®istry_arc) as Arc<dyn IProviderRegistry>,
|
|
channel_repo,
|
|
schedule_repo,
|
|
);
|
|
|
|
#[cfg(feature = "local-files")]
|
|
let transcode_settings_repo = build_transcode_settings_repository(&db_pool).await.ok();
|
|
|
|
#[allow(unused_mut)]
|
|
let mut state = AppState::new(
|
|
user_service,
|
|
channel_service,
|
|
schedule_engine,
|
|
provider_registry,
|
|
provider_config_repo,
|
|
config.clone(),
|
|
event_tx.clone(),
|
|
handles.log_tx,
|
|
handles.log_history,
|
|
activity_log_repo,
|
|
db_pool,
|
|
#[cfg(feature = "local-files")]
|
|
transcode_settings_repo,
|
|
)
|
|
.await?;
|
|
|
|
#[cfg(feature = "local-files")]
|
|
if let Some(idx) = local_index {
|
|
*state.local_index.write().await = Some(idx);
|
|
}
|
|
#[cfg(feature = "local-files")]
|
|
if let Some(tm) = transcode_manager {
|
|
*state.transcode_manager.write().await = Some(tm);
|
|
}
|
|
|
|
let server_config = ServerConfig {
|
|
cors_origins: config.cors_allowed_origins.clone(),
|
|
};
|
|
|
|
let bg_channel_repo_poller = bg_channel_repo.clone();
|
|
let bg_schedule_engine = Arc::clone(&state.schedule_engine);
|
|
tokio::spawn(scheduler::run_auto_scheduler(bg_schedule_engine, bg_channel_repo, event_tx.clone()));
|
|
|
|
let bg_schedule_engine_poller = Arc::clone(&state.schedule_engine);
|
|
tokio::spawn(poller::run_broadcast_poller(
|
|
bg_schedule_engine_poller,
|
|
bg_channel_repo_poller,
|
|
event_tx,
|
|
));
|
|
|
|
let app = Router::new()
|
|
.nest("/api/v1", routes::api_v1_router())
|
|
.with_state(state);
|
|
|
|
let app = apply_standard_middleware(app, &server_config);
|
|
|
|
// Wrap with an outer CorsLayer that includes the custom password headers.
|
|
// Being outermost it handles OPTIONS preflights before k_core's inner layer.
|
|
let origins: Vec<HeaderValue> = config
|
|
.cors_allowed_origins
|
|
.iter()
|
|
.filter_map(|o| o.parse().ok())
|
|
.collect();
|
|
let cors = CorsLayer::new()
|
|
.allow_origin(AllowOrigin::list(origins))
|
|
.allow_methods(AllowMethods::any())
|
|
.allow_headers(AllowHeaders::list([
|
|
axum::http::header::AUTHORIZATION,
|
|
axum::http::header::CONTENT_TYPE,
|
|
HeaderName::from_static("x-channel-password"),
|
|
HeaderName::from_static("x-block-password"),
|
|
]));
|
|
let app = app.layer(cors);
|
|
|
|
let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?;
|
|
let listener = TcpListener::bind(addr).await?;
|
|
|
|
tracing::info!("🚀 API server running at http://{}", addr);
|
|
tracing::info!("🔒 Authentication mode: JWT (Bearer token)");
|
|
|
|
#[cfg(feature = "auth-jwt")]
|
|
tracing::info!(" ✓ JWT auth enabled");
|
|
|
|
#[cfg(feature = "auth-oidc")]
|
|
tracing::info!(" ✓ OIDC integration enabled (stateless cookie state)");
|
|
|
|
tracing::info!("📝 API endpoints available at /api/v1/...");
|
|
|
|
axum::serve(listener, app).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Stand-in provider used when no real media source is configured.
|
|
/// Returns a descriptive error for every call so schedule endpoints fail
|
|
/// gracefully rather than panicking at startup.
|
|
struct NoopMediaProvider;
|
|
|
|
#[async_trait::async_trait]
|
|
impl IMediaProvider for NoopMediaProvider {
|
|
fn capabilities(&self) -> ProviderCapabilities {
|
|
ProviderCapabilities {
|
|
collections: false,
|
|
series: false,
|
|
genres: false,
|
|
tags: false,
|
|
decade: false,
|
|
search: false,
|
|
streaming_protocol: StreamingProtocol::DirectFile,
|
|
rescan: false,
|
|
transcode: false,
|
|
}
|
|
}
|
|
|
|
async fn fetch_items(
|
|
&self,
|
|
_: &domain::MediaFilter,
|
|
) -> domain::DomainResult<Vec<domain::MediaItem>> {
|
|
Err(domain::DomainError::InfrastructureError(
|
|
"No media provider configured. Set JELLYFIN_BASE_URL or LOCAL_FILES_DIR.".into(),
|
|
))
|
|
}
|
|
|
|
async fn fetch_by_id(
|
|
&self,
|
|
_: &domain::MediaItemId,
|
|
) -> domain::DomainResult<Option<domain::MediaItem>> {
|
|
Err(domain::DomainError::InfrastructureError(
|
|
"No media provider configured.".into(),
|
|
))
|
|
}
|
|
|
|
async fn get_stream_url(
|
|
&self,
|
|
_: &domain::MediaItemId,
|
|
_: &domain::StreamQuality,
|
|
) -> domain::DomainResult<String> {
|
|
Err(domain::DomainError::InfrastructureError(
|
|
"No media provider configured.".into(),
|
|
))
|
|
}
|
|
}
|