Compare commits

...

6 Commits

6 changed files with 372 additions and 279 deletions

View File

@@ -0,0 +1,36 @@
use std::sync::Arc;
use std::time::Duration as StdDuration;
use crate::config::Config;
use infra::run_migrations;
use k_core::db::DatabasePool;
pub async fn init_database(config: &Config) -> anyhow::Result<Arc<DatabasePool>> {
tracing::info!("Connecting to database: {}", config.database_url);
#[cfg(all(feature = "sqlite", not(feature = "postgres")))]
let db_type = k_core::db::DbType::Sqlite;
#[cfg(all(feature = "postgres", not(feature = "sqlite")))]
let db_type = k_core::db::DbType::Postgres;
// Both features enabled: fall back to URL inspection at runtime
#[cfg(all(feature = "sqlite", feature = "postgres"))]
let db_type = if config.database_url.starts_with("postgres") {
k_core::db::DbType::Postgres
} else {
k_core::db::DbType::Sqlite
};
let db_config = k_core::db::DatabaseConfig {
db_type,
url: config.database_url.clone(),
max_connections: config.db_max_connections,
min_connections: config.db_min_connections,
acquire_timeout: StdDuration::from_secs(30),
};
let pool = k_core::db::connect(&db_config).await?;
run_migrations(&pool).await?;
Ok(Arc::new(pool))
}

View File

@@ -2,27 +2,18 @@
//!
//! Configures and starts the HTTP server with JWT-based authentication.
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration as StdDuration;
use std::sync::Arc;
use axum::Router;
use axum::http::{HeaderName, HeaderValue};
use tokio::sync::broadcast;
use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
use tracing::info;
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
use domain::{ChannelService, IMediaProvider, IProviderRegistry, ProviderCapabilities, ScheduleEngineService, StreamingProtocol, UserService};
use domain::{ChannelService, IProviderRegistry, ScheduleEngineService, UserService};
use infra::factory::{build_activity_log_repository, build_channel_repository, build_provider_config_repository, build_schedule_repository, build_user_repository};
#[cfg(feature = "local-files")]
use infra::factory::build_transcode_settings_repository;
use infra::run_migrations;
use k_core::http::server::{ServerConfig, apply_standard_middleware};
use tokio::net::TcpListener;
mod config;
mod database;
mod provider_registry;
mod dto;
mod error;
mod events;
@@ -31,57 +22,25 @@ mod log_layer;
mod poller;
mod routes;
mod scheduler;
mod server;
mod startup;
mod state;
mod telemetry;
mod webhook;
use crate::config::{Config, ConfigSource};
use crate::config::Config;
use crate::state::AppState;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Set up broadcast channel + ring buffer for SSE log streaming.
let (log_tx, _) = broadcast::channel::<log_layer::LogLine>(512);
let log_history = Arc::new(Mutex::new(VecDeque::<log_layer::LogLine>::new()));
// Initialize tracing with our custom layer in addition to the fmt layer.
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(fmt::layer())
.with(log_layer::AppLogLayer::new(log_tx.clone(), Arc::clone(&log_history)))
.init();
let handles = telemetry::init_tracing();
let config = Config::from_env();
info!("Starting server on {}:{}", config.host, config.port);
// Setup database
tracing::info!("Connecting to database: {}", config.database_url);
#[cfg(all(feature = "sqlite", not(feature = "postgres")))]
let db_type = k_core::db::DbType::Sqlite;
#[cfg(all(feature = "postgres", not(feature = "sqlite")))]
let db_type = k_core::db::DbType::Postgres;
// Both features enabled: fall back to URL inspection at runtime
#[cfg(all(feature = "sqlite", feature = "postgres"))]
let db_type = if config.database_url.starts_with("postgres") {
k_core::db::DbType::Postgres
} else {
k_core::db::DbType::Sqlite
};
let db_config = k_core::db::DatabaseConfig {
db_type,
url: config.database_url.clone(),
max_connections: config.db_max_connections,
min_connections: config.db_min_connections,
acquire_timeout: StdDuration::from_secs(30),
};
let db_pool = k_core::db::connect(&db_config).await?;
run_migrations(&db_pool).await?;
let db_pool = Arc::new(db_pool);
let db_pool = database::init_database(&config).await?;
let user_repo = build_user_repository(&db_pool).await?;
let channel_repo = build_channel_repository(&db_pool).await?;
@@ -92,129 +51,13 @@ async fn main() -> anyhow::Result<()> {
let channel_service = ChannelService::new(channel_repo.clone());
// Build provider registry — all configured providers are registered simultaneously.
#[cfg(feature = "local-files")]
let mut local_index: Option<Arc<infra::LocalIndex>> = None;
#[cfg(feature = "local-files")]
let mut transcode_manager: Option<Arc<infra::TranscodeManager>> = None;
let mut registry = infra::ProviderRegistry::new();
let provider_config_repo = build_provider_config_repository(&db_pool).await?;
if config.config_source == ConfigSource::Db {
tracing::info!("CONFIG_SOURCE=db: loading provider configs from database");
let rows = provider_config_repo.get_all().await?;
for row in &rows {
if !row.enabled { continue; }
match row.provider_type.as_str() {
#[cfg(feature = "jellyfin")]
"jellyfin" => {
if let Ok(cfg) = serde_json::from_str::<infra::JellyfinConfig>(&row.config_json) {
tracing::info!("Loading Jellyfin provider from DB config");
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(cfg)));
}
}
#[cfg(feature = "local-files")]
"local_files" => {
if let Ok(cfg_map) = serde_json::from_str::<std::collections::HashMap<String, String>>(&row.config_json) {
if let Some(files_dir) = cfg_map.get("files_dir") {
let transcode_dir = cfg_map.get("transcode_dir")
.filter(|s| !s.is_empty())
.map(std::path::PathBuf::from);
let cleanup_ttl_hours: u32 = cfg_map.get("cleanup_ttl_hours")
.and_then(|s| s.parse().ok())
.unwrap_or(24);
tracing::info!("Loading local-files provider from DB config at {:?}", files_dir);
match infra::factory::build_local_files_bundle(
&db_pool,
std::path::PathBuf::from(files_dir),
transcode_dir,
cleanup_ttl_hours,
config.base_url.clone(),
).await {
Ok(bundle) => {
let scan_idx = Arc::clone(&bundle.local_index);
tokio::spawn(async move { scan_idx.rescan().await; });
if let Some(ref tm) = bundle.transcode_manager {
tracing::info!("Transcoding enabled");
// Load persisted TTL override from transcode_settings table.
let tm_clone = Arc::clone(tm);
let repo = build_transcode_settings_repository(&db_pool).await.ok();
tokio::spawn(async move {
if let Some(r) = repo {
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
tm_clone.set_cleanup_ttl(ttl);
}
}
});
}
registry.register("local", bundle.provider);
transcode_manager = bundle.transcode_manager;
local_index = Some(bundle.local_index);
}
Err(e) => tracing::warn!("Failed to build local-files provider: {}", e),
}
}
}
}
_ => {}
}
}
} else {
#[cfg(feature = "jellyfin")]
if let (Some(base_url), Some(api_key), Some(user_id)) = (
&config.jellyfin_base_url,
&config.jellyfin_api_key,
&config.jellyfin_user_id,
) {
tracing::info!("Media provider: Jellyfin at {}", base_url);
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(infra::JellyfinConfig {
base_url: base_url.clone(),
api_key: api_key.clone(),
user_id: user_id.clone(),
})));
}
let bundle = provider_registry::build_provider_registry(
&config, &db_pool, &provider_config_repo,
).await?;
#[cfg(feature = "local-files")]
if let Some(dir) = &config.local_files_dir {
tracing::info!("Media provider: local files at {:?}", dir);
match infra::factory::build_local_files_bundle(
&db_pool,
dir.clone(),
config.transcode_dir.clone(),
config.transcode_cleanup_ttl_hours,
config.base_url.clone(),
).await {
Ok(bundle) => {
let scan_idx = Arc::clone(&bundle.local_index);
tokio::spawn(async move { scan_idx.rescan().await; });
if let Some(ref tm) = bundle.transcode_manager {
tracing::info!("Transcoding enabled; cache dir: {:?}", config.transcode_dir);
let tm_clone = Arc::clone(tm);
let repo = build_transcode_settings_repository(&db_pool).await.ok();
tokio::spawn(async move {
if let Some(r) = repo {
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
tm_clone.set_cleanup_ttl(ttl);
}
}
});
}
registry.register("local", bundle.provider);
transcode_manager = bundle.transcode_manager;
local_index = Some(bundle.local_index);
}
Err(e) => tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR: {}", e),
}
}
}
if registry.is_empty() {
tracing::warn!("No media provider configured. Set JELLYFIN_BASE_URL / LOCAL_FILES_DIR.");
registry.register("noop", Arc::new(NoopMediaProvider));
}
let registry_arc = Arc::new(registry);
let registry_arc = bundle.registry;
let provider_registry: Arc<tokio::sync::RwLock<Arc<infra::ProviderRegistry>>> =
Arc::new(tokio::sync::RwLock::new(Arc::clone(&registry_arc)));
@@ -246,8 +89,8 @@ async fn main() -> anyhow::Result<()> {
provider_config_repo,
config.clone(),
event_tx.clone(),
log_tx,
log_history,
handles.log_tx,
handles.log_history,
activity_log_repo,
db_pool,
#[cfg(feature = "local-files")]
@@ -256,118 +99,19 @@ async fn main() -> anyhow::Result<()> {
.await?;
#[cfg(feature = "local-files")]
if let Some(idx) = local_index {
if let Some(idx) = bundle.local_index {
*state.local_index.write().await = Some(idx);
}
#[cfg(feature = "local-files")]
if let Some(tm) = transcode_manager {
if let Some(tm) = bundle.transcode_manager {
*state.transcode_manager.write().await = Some(tm);
}
let server_config = ServerConfig {
cors_origins: config.cors_allowed_origins.clone(),
};
let bg_channel_repo_poller = bg_channel_repo.clone();
let bg_schedule_engine = Arc::clone(&state.schedule_engine);
tokio::spawn(scheduler::run_auto_scheduler(bg_schedule_engine, bg_channel_repo, event_tx.clone()));
let bg_schedule_engine_poller = Arc::clone(&state.schedule_engine);
tokio::spawn(poller::run_broadcast_poller(
bg_schedule_engine_poller,
bg_channel_repo_poller,
startup::spawn_background_tasks(
Arc::clone(&state.schedule_engine),
bg_channel_repo,
event_tx,
));
);
let app = Router::new()
.nest("/api/v1", routes::api_v1_router())
.with_state(state);
let app = apply_standard_middleware(app, &server_config);
// Wrap with an outer CorsLayer that includes the custom password headers.
// Being outermost it handles OPTIONS preflights before k_core's inner layer.
let origins: Vec<HeaderValue> = config
.cors_allowed_origins
.iter()
.filter_map(|o| o.parse().ok())
.collect();
let cors = CorsLayer::new()
.allow_origin(AllowOrigin::list(origins))
.allow_methods(AllowMethods::any())
.allow_headers(AllowHeaders::list([
axum::http::header::AUTHORIZATION,
axum::http::header::CONTENT_TYPE,
HeaderName::from_static("x-channel-password"),
HeaderName::from_static("x-block-password"),
]));
let app = app.layer(cors);
let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?;
let listener = TcpListener::bind(addr).await?;
tracing::info!("🚀 API server running at http://{}", addr);
tracing::info!("🔒 Authentication mode: JWT (Bearer token)");
#[cfg(feature = "auth-jwt")]
tracing::info!(" ✓ JWT auth enabled");
#[cfg(feature = "auth-oidc")]
tracing::info!(" ✓ OIDC integration enabled (stateless cookie state)");
tracing::info!("📝 API endpoints available at /api/v1/...");
axum::serve(listener, app).await?;
Ok(())
}
/// Stand-in provider used when no real media source is configured.
/// Returns a descriptive error for every call so schedule endpoints fail
/// gracefully rather than panicking at startup.
struct NoopMediaProvider;
#[async_trait::async_trait]
impl IMediaProvider for NoopMediaProvider {
fn capabilities(&self) -> ProviderCapabilities {
ProviderCapabilities {
collections: false,
series: false,
genres: false,
tags: false,
decade: false,
search: false,
streaming_protocol: StreamingProtocol::DirectFile,
rescan: false,
transcode: false,
}
}
async fn fetch_items(
&self,
_: &domain::MediaFilter,
) -> domain::DomainResult<Vec<domain::MediaItem>> {
Err(domain::DomainError::InfrastructureError(
"No media provider configured. Set JELLYFIN_BASE_URL or LOCAL_FILES_DIR.".into(),
))
}
async fn fetch_by_id(
&self,
_: &domain::MediaItemId,
) -> domain::DomainResult<Option<domain::MediaItem>> {
Err(domain::DomainError::InfrastructureError(
"No media provider configured.".into(),
))
}
async fn get_stream_url(
&self,
_: &domain::MediaItemId,
_: &domain::StreamQuality,
) -> domain::DomainResult<String> {
Err(domain::DomainError::InfrastructureError(
"No media provider configured.".into(),
))
}
server::build_and_serve(state, &config).await
}

View File

@@ -0,0 +1,205 @@
use std::sync::Arc;
use domain::{
DomainError, IMediaProvider, ProviderCapabilities, ProviderConfigRepository,
StreamingProtocol, StreamQuality,
};
use k_core::db::DatabasePool;
use crate::config::{Config, ConfigSource};
#[cfg(feature = "local-files")]
use infra::factory::build_transcode_settings_repository;
pub struct ProviderBundle {
pub registry: Arc<infra::ProviderRegistry>,
#[cfg(feature = "local-files")]
pub local_index: Option<Arc<infra::LocalIndex>>,
#[cfg(feature = "local-files")]
pub transcode_manager: Option<Arc<infra::TranscodeManager>>,
}
pub async fn build_provider_registry(
config: &Config,
#[cfg_attr(not(feature = "local-files"), allow(unused_variables))]
db_pool: &Arc<DatabasePool>,
provider_config_repo: &Arc<dyn ProviderConfigRepository>,
) -> anyhow::Result<ProviderBundle> {
#[cfg(feature = "local-files")]
let mut local_index: Option<Arc<infra::LocalIndex>> = None;
#[cfg(feature = "local-files")]
let mut transcode_manager: Option<Arc<infra::TranscodeManager>> = None;
let mut registry = infra::ProviderRegistry::new();
if config.config_source == ConfigSource::Db {
tracing::info!("CONFIG_SOURCE=db: loading provider configs from database");
let rows = provider_config_repo.get_all().await?;
for row in &rows {
if !row.enabled { continue; }
match row.provider_type.as_str() {
#[cfg(feature = "jellyfin")]
"jellyfin" => {
if let Ok(cfg) = serde_json::from_str::<infra::JellyfinConfig>(&row.config_json) {
tracing::info!("Loading Jellyfin provider from DB config");
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(cfg)));
}
}
#[cfg(feature = "local-files")]
"local_files" => {
if let Ok(cfg_map) = serde_json::from_str::<std::collections::HashMap<String, String>>(&row.config_json) {
if let Some(files_dir) = cfg_map.get("files_dir") {
let transcode_dir = cfg_map.get("transcode_dir")
.filter(|s| !s.is_empty())
.map(std::path::PathBuf::from);
let cleanup_ttl_hours: u32 = cfg_map.get("cleanup_ttl_hours")
.and_then(|s| s.parse().ok())
.unwrap_or(24);
tracing::info!("Loading local-files provider from DB config at {:?}", files_dir);
match infra::factory::build_local_files_bundle(
db_pool,
std::path::PathBuf::from(files_dir),
transcode_dir,
cleanup_ttl_hours,
config.base_url.clone(),
).await {
Ok(bundle) => {
let scan_idx = Arc::clone(&bundle.local_index);
tokio::spawn(async move { scan_idx.rescan().await; });
if let Some(ref tm) = bundle.transcode_manager {
tracing::info!("Transcoding enabled");
// Load persisted TTL override from transcode_settings table.
let tm_clone = Arc::clone(tm);
let repo = build_transcode_settings_repository(db_pool).await.ok();
tokio::spawn(async move {
if let Some(r) = repo {
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
tm_clone.set_cleanup_ttl(ttl);
}
}
});
}
registry.register("local", bundle.provider);
transcode_manager = bundle.transcode_manager;
local_index = Some(bundle.local_index);
}
Err(e) => tracing::warn!("Failed to build local-files provider: {}", e),
}
}
}
}
_ => {}
}
}
} else {
#[cfg(feature = "jellyfin")]
if let (Some(base_url), Some(api_key), Some(user_id)) = (
&config.jellyfin_base_url,
&config.jellyfin_api_key,
&config.jellyfin_user_id,
) {
tracing::info!("Media provider: Jellyfin at {}", base_url);
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(infra::JellyfinConfig {
base_url: base_url.clone(),
api_key: api_key.clone(),
user_id: user_id.clone(),
})));
}
#[cfg(feature = "local-files")]
if let Some(dir) = &config.local_files_dir {
tracing::info!("Media provider: local files at {:?}", dir);
match infra::factory::build_local_files_bundle(
db_pool,
dir.clone(),
config.transcode_dir.clone(),
config.transcode_cleanup_ttl_hours,
config.base_url.clone(),
).await {
Ok(bundle) => {
let scan_idx = Arc::clone(&bundle.local_index);
tokio::spawn(async move { scan_idx.rescan().await; });
if let Some(ref tm) = bundle.transcode_manager {
tracing::info!("Transcoding enabled; cache dir: {:?}", config.transcode_dir);
let tm_clone = Arc::clone(tm);
let repo = build_transcode_settings_repository(db_pool).await.ok();
tokio::spawn(async move {
if let Some(r) = repo {
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
tm_clone.set_cleanup_ttl(ttl);
}
}
});
}
registry.register("local", bundle.provider);
transcode_manager = bundle.transcode_manager;
local_index = Some(bundle.local_index);
}
Err(e) => tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR: {}", e),
}
}
}
if registry.is_empty() {
tracing::warn!("No media provider configured. Set JELLYFIN_BASE_URL / LOCAL_FILES_DIR.");
registry.register("noop", Arc::new(NoopMediaProvider));
}
Ok(ProviderBundle {
registry: Arc::new(registry),
#[cfg(feature = "local-files")]
local_index,
#[cfg(feature = "local-files")]
transcode_manager,
})
}
/// Stand-in provider used when no real media source is configured.
/// Returns a descriptive error for every call so schedule endpoints fail
/// gracefully rather than panicking at startup.
struct NoopMediaProvider;
#[async_trait::async_trait]
impl IMediaProvider for NoopMediaProvider {
fn capabilities(&self) -> ProviderCapabilities {
ProviderCapabilities {
collections: false,
series: false,
genres: false,
tags: false,
decade: false,
search: false,
streaming_protocol: StreamingProtocol::DirectFile,
rescan: false,
transcode: false,
}
}
async fn fetch_items(
&self,
_: &domain::MediaFilter,
) -> domain::DomainResult<Vec<domain::MediaItem>> {
Err(DomainError::InfrastructureError(
"No media provider configured. Set JELLYFIN_BASE_URL or LOCAL_FILES_DIR.".into(),
))
}
async fn fetch_by_id(
&self,
_: &domain::MediaItemId,
) -> domain::DomainResult<Option<domain::MediaItem>> {
Err(DomainError::InfrastructureError(
"No media provider configured.".into(),
))
}
async fn get_stream_url(
&self,
_: &domain::MediaItemId,
_: &StreamQuality,
) -> domain::DomainResult<String> {
Err(DomainError::InfrastructureError(
"No media provider configured.".into(),
))
}
}

View File

@@ -0,0 +1,59 @@
use std::net::SocketAddr;
use axum::Router;
use axum::http::{HeaderName, HeaderValue};
use k_core::http::server::{ServerConfig, apply_standard_middleware};
use tokio::net::TcpListener;
use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
use crate::config::Config;
use crate::routes;
use crate::state::AppState;
pub async fn build_and_serve(state: AppState, config: &Config) -> anyhow::Result<()> {
let server_config = ServerConfig {
cors_origins: config.cors_allowed_origins.clone(),
};
let app = Router::new()
.nest("/api/v1", routes::api_v1_router())
.with_state(state);
let app = apply_standard_middleware(app, &server_config);
// Wrap with an outer CorsLayer that includes the custom password headers.
// Being outermost it handles OPTIONS preflights before k_core's inner layer.
let origins: Vec<HeaderValue> = config
.cors_allowed_origins
.iter()
.filter_map(|o| o.parse().ok())
.collect();
let cors = CorsLayer::new()
.allow_origin(AllowOrigin::list(origins))
.allow_methods(AllowMethods::any())
.allow_headers(AllowHeaders::list([
axum::http::header::AUTHORIZATION,
axum::http::header::CONTENT_TYPE,
HeaderName::from_static("x-channel-password"),
HeaderName::from_static("x-block-password"),
]));
let app = app.layer(cors);
let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?;
let listener = TcpListener::bind(addr).await?;
tracing::info!("🚀 API server running at http://{}", addr);
tracing::info!("🔒 Authentication mode: JWT (Bearer token)");
#[cfg(feature = "auth-jwt")]
tracing::info!(" ✓ JWT auth enabled");
#[cfg(feature = "auth-oidc")]
tracing::info!(" ✓ OIDC integration enabled (stateless cookie state)");
tracing::info!("📝 API endpoints available at /api/v1/...");
axum::serve(listener, app).await?;
Ok(())
}

View File

@@ -0,0 +1,24 @@
use std::sync::Arc;
use domain::{ChannelRepository, DomainEvent, ScheduleEngineService};
use tokio::sync::broadcast;
use crate::{poller, scheduler};
pub fn spawn_background_tasks(
schedule_engine: Arc<ScheduleEngineService>,
channel_repo: Arc<dyn ChannelRepository>,
event_tx: broadcast::Sender<DomainEvent>,
) {
let bg_channel_repo = channel_repo.clone();
tokio::spawn(scheduler::run_auto_scheduler(
Arc::clone(&schedule_engine),
bg_channel_repo,
event_tx.clone(),
));
tokio::spawn(poller::run_broadcast_poller(
schedule_engine,
channel_repo,
event_tx,
));
}

View File

@@ -0,0 +1,25 @@
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
use crate::log_layer::{AppLogLayer, LogLine};
pub struct LoggingHandles {
pub log_tx: broadcast::Sender<LogLine>,
pub log_history: Arc<Mutex<VecDeque<LogLine>>>,
}
pub fn init_tracing() -> LoggingHandles {
let (log_tx, _) = broadcast::channel::<LogLine>(512);
let log_history = Arc::new(Mutex::new(VecDeque::<LogLine>::new()));
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(fmt::layer())
.with(AppLogLayer::new(log_tx.clone(), Arc::clone(&log_history)))
.init();
LoggingHandles { log_tx, log_history }
}