Compare commits
11 Commits
b35054f23e
...
9d792249c9
| Author | SHA1 | Date | |
|---|---|---|---|
| 9d792249c9 | |||
| 50df852416 | |||
| d88afbfe2e | |||
| 0637504974 | |||
| 712cf1deb9 | |||
| 89036ba62d | |||
| 87f94fcc51 | |||
| 46333853d2 | |||
| 0e51b7c0f1 | |||
| 4ca8690a89 | |||
| d80d4e9741 |
1
k-tv-backend/Cargo.lock
generated
1
k-tv-backend/Cargo.lock
generated
@@ -86,7 +86,6 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_qs",
|
||||
"sqlx",
|
||||
"thiserror 2.0.17",
|
||||
"time",
|
||||
"tokio",
|
||||
|
||||
@@ -11,7 +11,7 @@ postgres = ["infra/postgres"]
|
||||
auth-oidc = ["infra/auth-oidc"]
|
||||
auth-jwt = ["infra/auth-jwt"]
|
||||
jellyfin = ["infra/jellyfin"]
|
||||
local-files = ["infra/local-files", "dep:tokio-util", "dep:sqlx"]
|
||||
local-files = ["infra/local-files", "dep:tokio-util"]
|
||||
|
||||
[profile.release]
|
||||
strip = true
|
||||
@@ -65,4 +65,3 @@ async-trait = "0.1"
|
||||
dotenvy = "0.15.7"
|
||||
time = "0.3"
|
||||
tokio-util = { version = "0.7", features = ["io"], optional = true }
|
||||
sqlx = { version = "0.8.6", features = ["runtime-tokio", "sqlite"], optional = true }
|
||||
|
||||
@@ -5,9 +5,16 @@
|
||||
use std::env;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum ConfigSource {
|
||||
Env,
|
||||
Db,
|
||||
}
|
||||
|
||||
/// Application configuration loaded from environment variables
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Config {
|
||||
pub config_source: ConfigSource,
|
||||
pub database_url: String,
|
||||
pub cookie_secret: String,
|
||||
pub cors_allowed_origins: Vec<String>,
|
||||
@@ -134,7 +141,13 @@ impl Config {
|
||||
let base_url = env::var("BASE_URL")
|
||||
.unwrap_or_else(|_| format!("http://localhost:{}", port));
|
||||
|
||||
let config_source = match env::var("CONFIG_SOURCE").as_deref() {
|
||||
Ok("db") | Ok("DB") => ConfigSource::Db,
|
||||
_ => ConfigSource::Env,
|
||||
};
|
||||
|
||||
Self {
|
||||
config_source,
|
||||
host,
|
||||
port,
|
||||
database_url,
|
||||
|
||||
@@ -32,6 +32,7 @@ pub struct UserResponse {
|
||||
pub id: Uuid,
|
||||
pub email: String,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub is_admin: bool,
|
||||
}
|
||||
|
||||
/// JWT token response
|
||||
@@ -57,6 +58,8 @@ pub struct ConfigResponse {
|
||||
pub providers: Vec<ProviderInfo>,
|
||||
/// Capabilities of the primary provider — kept for backward compatibility.
|
||||
pub provider_capabilities: domain::ProviderCapabilities,
|
||||
/// Provider type strings supported by this build (feature-gated).
|
||||
pub available_provider_types: Vec<String>,
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
|
||||
@@ -78,6 +78,21 @@ impl FromRequestParts<AppState> for OptionalCurrentUser {
|
||||
}
|
||||
}
|
||||
|
||||
/// Extracted admin user — returns 403 if user is not an admin.
|
||||
pub struct AdminUser(pub User);
|
||||
|
||||
impl FromRequestParts<AppState> for AdminUser {
|
||||
type Rejection = ApiError;
|
||||
|
||||
async fn from_request_parts(parts: &mut Parts, state: &AppState) -> Result<Self, Self::Rejection> {
|
||||
let CurrentUser(user) = CurrentUser::from_request_parts(parts, state).await?;
|
||||
if !user.is_admin {
|
||||
return Err(ApiError::Forbidden("Admin access required".to_string()));
|
||||
}
|
||||
Ok(AdminUser(user))
|
||||
}
|
||||
}
|
||||
|
||||
/// Authenticate using JWT Bearer token from the `Authorization` header.
|
||||
#[cfg(feature = "auth-jwt")]
|
||||
async fn try_jwt_auth(parts: &mut Parts, state: &AppState) -> Result<User, ApiError> {
|
||||
|
||||
@@ -15,7 +15,9 @@ use tracing::info;
|
||||
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
use domain::{ChannelService, IMediaProvider, IProviderRegistry, ProviderCapabilities, ScheduleEngineService, StreamingProtocol, UserService};
|
||||
use infra::factory::{build_activity_log_repository, build_channel_repository, build_schedule_repository, build_user_repository};
|
||||
use infra::factory::{build_activity_log_repository, build_channel_repository, build_provider_config_repository, build_schedule_repository, build_user_repository};
|
||||
#[cfg(feature = "local-files")]
|
||||
use infra::factory::build_transcode_settings_repository;
|
||||
use infra::run_migrations;
|
||||
use k_core::http::server::{ServerConfig, apply_standard_middleware};
|
||||
use tokio::net::TcpListener;
|
||||
@@ -32,7 +34,7 @@ mod scheduler;
|
||||
mod state;
|
||||
mod webhook;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::config::{Config, ConfigSource};
|
||||
use crate::state::AppState;
|
||||
|
||||
#[tokio::main]
|
||||
@@ -79,6 +81,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let db_pool = k_core::db::connect(&db_config).await?;
|
||||
run_migrations(&db_pool).await?;
|
||||
let db_pool = Arc::new(db_pool);
|
||||
|
||||
let user_repo = build_user_repository(&db_pool).await?;
|
||||
let channel_repo = build_channel_repository(&db_pool).await?;
|
||||
@@ -93,11 +96,71 @@ async fn main() -> anyhow::Result<()> {
|
||||
let mut local_index: Option<Arc<infra::LocalIndex>> = None;
|
||||
#[cfg(feature = "local-files")]
|
||||
let mut transcode_manager: Option<Arc<infra::TranscodeManager>> = None;
|
||||
#[cfg(feature = "local-files")]
|
||||
let mut sqlite_pool_for_state: Option<sqlx::SqlitePool> = None;
|
||||
|
||||
let mut registry = infra::ProviderRegistry::new();
|
||||
|
||||
let provider_config_repo = build_provider_config_repository(&db_pool).await?;
|
||||
|
||||
if config.config_source == ConfigSource::Db {
|
||||
tracing::info!("CONFIG_SOURCE=db: loading provider configs from database");
|
||||
let rows = provider_config_repo.get_all().await?;
|
||||
for row in &rows {
|
||||
if !row.enabled { continue; }
|
||||
match row.provider_type.as_str() {
|
||||
#[cfg(feature = "jellyfin")]
|
||||
"jellyfin" => {
|
||||
if let Ok(cfg) = serde_json::from_str::<infra::JellyfinConfig>(&row.config_json) {
|
||||
tracing::info!("Loading Jellyfin provider from DB config");
|
||||
registry.register("jellyfin", Arc::new(infra::JellyfinMediaProvider::new(cfg)));
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "local-files")]
|
||||
"local_files" => {
|
||||
if let Ok(cfg_map) = serde_json::from_str::<std::collections::HashMap<String, String>>(&row.config_json) {
|
||||
if let Some(files_dir) = cfg_map.get("files_dir") {
|
||||
let transcode_dir = cfg_map.get("transcode_dir")
|
||||
.filter(|s| !s.is_empty())
|
||||
.map(std::path::PathBuf::from);
|
||||
let cleanup_ttl_hours: u32 = cfg_map.get("cleanup_ttl_hours")
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(24);
|
||||
tracing::info!("Loading local-files provider from DB config at {:?}", files_dir);
|
||||
match infra::factory::build_local_files_bundle(
|
||||
&db_pool,
|
||||
std::path::PathBuf::from(files_dir),
|
||||
transcode_dir,
|
||||
cleanup_ttl_hours,
|
||||
config.base_url.clone(),
|
||||
).await {
|
||||
Ok(bundle) => {
|
||||
let scan_idx = Arc::clone(&bundle.local_index);
|
||||
tokio::spawn(async move { scan_idx.rescan().await; });
|
||||
if let Some(ref tm) = bundle.transcode_manager {
|
||||
tracing::info!("Transcoding enabled");
|
||||
// Load persisted TTL override from transcode_settings table.
|
||||
let tm_clone = Arc::clone(tm);
|
||||
let repo = build_transcode_settings_repository(&db_pool).await.ok();
|
||||
tokio::spawn(async move {
|
||||
if let Some(r) = repo {
|
||||
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
|
||||
tm_clone.set_cleanup_ttl(ttl);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
registry.register("local", bundle.provider);
|
||||
transcode_manager = bundle.transcode_manager;
|
||||
local_index = Some(bundle.local_index);
|
||||
}
|
||||
Err(e) => tracing::warn!("Failed to build local-files provider: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
#[cfg(feature = "jellyfin")]
|
||||
if let (Some(base_url), Some(api_key), Some(user_id)) = (
|
||||
&config.jellyfin_base_url,
|
||||
@@ -114,48 +177,35 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
#[cfg(feature = "local-files")]
|
||||
if let Some(dir) = &config.local_files_dir {
|
||||
if let k_core::db::DatabasePool::Sqlite(ref sqlite_pool) = db_pool {
|
||||
tracing::info!("Media provider: local files at {:?}", dir);
|
||||
let lf_cfg = infra::LocalFilesConfig {
|
||||
root_dir: dir.clone(),
|
||||
base_url: config.base_url.clone(),
|
||||
transcode_dir: config.transcode_dir.clone(),
|
||||
cleanup_ttl_hours: config.transcode_cleanup_ttl_hours,
|
||||
};
|
||||
let idx = Arc::new(infra::LocalIndex::new(&lf_cfg, sqlite_pool.clone()).await);
|
||||
local_index = Some(Arc::clone(&idx));
|
||||
let scan_idx = Arc::clone(&idx);
|
||||
match infra::factory::build_local_files_bundle(
|
||||
&db_pool,
|
||||
dir.clone(),
|
||||
config.transcode_dir.clone(),
|
||||
config.transcode_cleanup_ttl_hours,
|
||||
config.base_url.clone(),
|
||||
).await {
|
||||
Ok(bundle) => {
|
||||
let scan_idx = Arc::clone(&bundle.local_index);
|
||||
tokio::spawn(async move { scan_idx.rescan().await; });
|
||||
|
||||
// Build TranscodeManager if TRANSCODE_DIR is set.
|
||||
let tm = config.transcode_dir.as_ref().map(|td| {
|
||||
std::fs::create_dir_all(td).ok();
|
||||
tracing::info!("Transcoding enabled; cache dir: {:?}", td);
|
||||
let tm = infra::TranscodeManager::new(td.clone(), config.transcode_cleanup_ttl_hours);
|
||||
// Load persisted TTL from DB.
|
||||
let tm_clone = Arc::clone(&tm);
|
||||
let pool_clone = sqlite_pool.clone();
|
||||
if let Some(ref tm) = bundle.transcode_manager {
|
||||
tracing::info!("Transcoding enabled; cache dir: {:?}", config.transcode_dir);
|
||||
let tm_clone = Arc::clone(tm);
|
||||
let repo = build_transcode_settings_repository(&db_pool).await.ok();
|
||||
tokio::spawn(async move {
|
||||
if let Ok(row) = sqlx::query_as::<_, (i64,)>(
|
||||
"SELECT cleanup_ttl_hours FROM transcode_settings WHERE id = 1",
|
||||
)
|
||||
.fetch_one(&pool_clone)
|
||||
.await
|
||||
{
|
||||
tm_clone.set_cleanup_ttl(row.0 as u32);
|
||||
if let Some(r) = repo {
|
||||
if let Ok(Some(ttl)) = r.load_cleanup_ttl().await {
|
||||
tm_clone.set_cleanup_ttl(ttl);
|
||||
}
|
||||
}
|
||||
});
|
||||
tm
|
||||
});
|
||||
|
||||
registry.register(
|
||||
"local",
|
||||
Arc::new(infra::LocalFilesProvider::new(idx, lf_cfg, tm.clone())),
|
||||
);
|
||||
transcode_manager = tm;
|
||||
sqlite_pool_for_state = Some(sqlite_pool.clone());
|
||||
} else {
|
||||
tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR");
|
||||
}
|
||||
registry.register("local", bundle.provider);
|
||||
transcode_manager = bundle.transcode_manager;
|
||||
local_index = Some(bundle.local_index);
|
||||
}
|
||||
Err(e) => tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,7 +214,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
registry.register("noop", Arc::new(NoopMediaProvider));
|
||||
}
|
||||
|
||||
let registry = Arc::new(registry);
|
||||
let registry_arc = Arc::new(registry);
|
||||
let provider_registry: Arc<tokio::sync::RwLock<Arc<infra::ProviderRegistry>>> =
|
||||
Arc::new(tokio::sync::RwLock::new(Arc::clone(®istry_arc)));
|
||||
|
||||
let (event_tx, event_rx) = tokio::sync::broadcast::channel::<domain::DomainEvent>(64);
|
||||
|
||||
@@ -177,30 +229,39 @@ async fn main() -> anyhow::Result<()> {
|
||||
));
|
||||
|
||||
let schedule_engine = ScheduleEngineService::new(
|
||||
Arc::clone(®istry) as Arc<dyn IProviderRegistry>,
|
||||
Arc::clone(®istry_arc) as Arc<dyn IProviderRegistry>,
|
||||
channel_repo,
|
||||
schedule_repo,
|
||||
);
|
||||
|
||||
#[cfg(feature = "local-files")]
|
||||
let transcode_settings_repo = build_transcode_settings_repository(&db_pool).await.ok();
|
||||
|
||||
#[allow(unused_mut)]
|
||||
let mut state = AppState::new(
|
||||
user_service,
|
||||
channel_service,
|
||||
schedule_engine,
|
||||
registry,
|
||||
provider_registry,
|
||||
provider_config_repo,
|
||||
config.clone(),
|
||||
event_tx.clone(),
|
||||
log_tx,
|
||||
log_history,
|
||||
activity_log_repo,
|
||||
db_pool,
|
||||
#[cfg(feature = "local-files")]
|
||||
transcode_settings_repo,
|
||||
)
|
||||
.await?;
|
||||
|
||||
#[cfg(feature = "local-files")]
|
||||
{
|
||||
state.local_index = local_index;
|
||||
state.transcode_manager = transcode_manager;
|
||||
state.sqlite_pool = sqlite_pool_for_state;
|
||||
if let Some(idx) = local_index {
|
||||
*state.local_index.write().await = Some(idx);
|
||||
}
|
||||
#[cfg(feature = "local-files")]
|
||||
if let Some(tm) = transcode_manager {
|
||||
*state.transcode_manager.write().await = Some(tm);
|
||||
}
|
||||
|
||||
let server_config = ServerConfig {
|
||||
|
||||
@@ -17,7 +17,7 @@ use domain::{ChannelRepository, DomainError, DomainEvent, ScheduleEngineService}
|
||||
|
||||
/// Per-channel poller state.
|
||||
#[derive(Debug)]
|
||||
struct ChannelPollState {
|
||||
pub struct ChannelPollState {
|
||||
/// ID of the last slot we saw as current (None = no signal).
|
||||
last_slot_id: Option<Uuid>,
|
||||
/// Wall-clock instant of the last poll for this channel.
|
||||
@@ -80,13 +80,11 @@ pub(crate) async fn poll_tick(
|
||||
|
||||
// Find the current slot
|
||||
let current_slot_id = match schedule_engine.get_active_schedule(channel.id, now).await {
|
||||
Ok(Some(schedule)) => {
|
||||
schedule
|
||||
Ok(Some(schedule)) => schedule
|
||||
.slots
|
||||
.iter()
|
||||
.find(|s| s.start_at <= now && now < s.end_at)
|
||||
.map(|s| s.id)
|
||||
}
|
||||
.map(|s| s.id),
|
||||
Ok(None) => None,
|
||||
Err(DomainError::NoActiveSchedule(_)) => None,
|
||||
Err(DomainError::ChannelNotFound(_)) => {
|
||||
@@ -109,7 +107,9 @@ pub(crate) async fn poll_tick(
|
||||
// State changed — emit appropriate event
|
||||
match ¤t_slot_id {
|
||||
Some(slot_id) => {
|
||||
if let Ok(Some(schedule)) = schedule_engine.get_active_schedule(channel.id, now).await {
|
||||
if let Ok(Some(schedule)) =
|
||||
schedule_engine.get_active_schedule(channel.id, now).await
|
||||
{
|
||||
if let Some(slot) = schedule.slots.iter().find(|s| s.id == *slot_id).cloned() {
|
||||
let _ = event_tx.send(DomainEvent::BroadcastTransition {
|
||||
channel_id: channel.id,
|
||||
@@ -137,12 +137,12 @@ mod tests {
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use domain::value_objects::{ChannelId, ContentType, UserId};
|
||||
use domain::{
|
||||
Channel, ChannelRepository, Collection, DomainResult, GeneratedSchedule, IProviderRegistry,
|
||||
MediaFilter, MediaItem, MediaItemId, PlaybackRecord, ProviderCapabilities,
|
||||
ScheduleEngineService, ScheduleRepository, SeriesSummary, StreamQuality, StreamingProtocol,
|
||||
ScheduleEngineService, ScheduleRepository, SeriesSummary, StreamQuality,
|
||||
};
|
||||
use domain::value_objects::{ChannelId, ContentType, UserId};
|
||||
use tokio::sync::broadcast;
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -188,14 +188,20 @@ mod tests {
|
||||
) -> DomainResult<Option<GeneratedSchedule>> {
|
||||
Ok(self.active.clone())
|
||||
}
|
||||
async fn find_latest(&self, _channel_id: ChannelId) -> DomainResult<Option<GeneratedSchedule>> {
|
||||
async fn find_latest(
|
||||
&self,
|
||||
_channel_id: ChannelId,
|
||||
) -> DomainResult<Option<GeneratedSchedule>> {
|
||||
Ok(self.active.clone())
|
||||
}
|
||||
async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> {
|
||||
self.saved.lock().unwrap().push(schedule.clone());
|
||||
Ok(())
|
||||
}
|
||||
async fn find_playback_history(&self, _channel_id: ChannelId) -> DomainResult<Vec<PlaybackRecord>> {
|
||||
async fn find_playback_history(
|
||||
&self,
|
||||
_channel_id: ChannelId,
|
||||
) -> DomainResult<Vec<PlaybackRecord>> {
|
||||
Ok(vec![])
|
||||
}
|
||||
async fn save_playback_record(&self, _record: &PlaybackRecord) -> DomainResult<()> {
|
||||
@@ -207,13 +213,21 @@ mod tests {
|
||||
|
||||
#[async_trait]
|
||||
impl IProviderRegistry for MockRegistry {
|
||||
async fn fetch_items(&self, _provider_id: &str, _filter: &MediaFilter) -> DomainResult<Vec<MediaItem>> {
|
||||
async fn fetch_items(
|
||||
&self,
|
||||
_provider_id: &str,
|
||||
_filter: &MediaFilter,
|
||||
) -> DomainResult<Vec<MediaItem>> {
|
||||
Ok(vec![])
|
||||
}
|
||||
async fn fetch_by_id(&self, _item_id: &MediaItemId) -> DomainResult<Option<MediaItem>> {
|
||||
Ok(None)
|
||||
}
|
||||
async fn get_stream_url(&self, _item_id: &MediaItemId, _quality: &StreamQuality) -> DomainResult<String> {
|
||||
async fn get_stream_url(
|
||||
&self,
|
||||
_item_id: &MediaItemId,
|
||||
_quality: &StreamQuality,
|
||||
) -> DomainResult<String> {
|
||||
unimplemented!()
|
||||
}
|
||||
fn provider_ids(&self) -> Vec<String> {
|
||||
@@ -228,10 +242,18 @@ mod tests {
|
||||
async fn list_collections(&self, _provider_id: &str) -> DomainResult<Vec<Collection>> {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn list_series(&self, _provider_id: &str, _collection_id: Option<&str>) -> DomainResult<Vec<SeriesSummary>> {
|
||||
async fn list_series(
|
||||
&self,
|
||||
_provider_id: &str,
|
||||
_collection_id: Option<&str>,
|
||||
) -> DomainResult<Vec<SeriesSummary>> {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn list_genres(&self, _provider_id: &str, _content_type: Option<&ContentType>) -> DomainResult<Vec<String>> {
|
||||
async fn list_genres(
|
||||
&self,
|
||||
_provider_id: &str,
|
||||
_content_type: Option<&ContentType>,
|
||||
) -> DomainResult<Vec<String>> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
@@ -318,7 +340,10 @@ mod tests {
|
||||
|
||||
let event = event_rx.try_recv().expect("expected an event");
|
||||
match event {
|
||||
DomainEvent::BroadcastTransition { channel_id: cid, slot: s } => {
|
||||
DomainEvent::BroadcastTransition {
|
||||
channel_id: cid,
|
||||
slot: s,
|
||||
} => {
|
||||
assert_eq!(cid, channel_id);
|
||||
assert_eq!(s.id, slot_id);
|
||||
}
|
||||
@@ -388,11 +413,18 @@ mod tests {
|
||||
async fn find_latest(&self, _: ChannelId) -> DomainResult<Option<GeneratedSchedule>> {
|
||||
Ok(None)
|
||||
}
|
||||
async fn save(&self, _: &GeneratedSchedule) -> DomainResult<()> { Ok(()) }
|
||||
async fn find_playback_history(&self, _: ChannelId) -> DomainResult<Vec<PlaybackRecord>> {
|
||||
async fn save(&self, _: &GeneratedSchedule) -> DomainResult<()> {
|
||||
Ok(())
|
||||
}
|
||||
async fn find_playback_history(
|
||||
&self,
|
||||
_: ChannelId,
|
||||
) -> DomainResult<Vec<PlaybackRecord>> {
|
||||
Ok(vec![])
|
||||
}
|
||||
async fn save_playback_record(&self, _: &PlaybackRecord) -> DomainResult<()> { Ok(()) }
|
||||
async fn save_playback_record(&self, _: &PlaybackRecord) -> DomainResult<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let now = Utc::now();
|
||||
|
||||
408
k-tv-backend/api/src/routes/admin_providers.rs
Normal file
408
k-tv-backend/api/src/routes/admin_providers.rs
Normal file
@@ -0,0 +1,408 @@
|
||||
//! Admin provider management routes.
|
||||
//!
|
||||
//! All routes require an admin user. Allows listing, updating, deleting, and
|
||||
//! testing media provider configs stored in the DB. Only available when
|
||||
//! CONFIG_SOURCE=db.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::Router;
|
||||
use axum::extract::{Path, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::IntoResponse;
|
||||
use axum::routing::{get, post, put, delete};
|
||||
use axum::Json;
|
||||
use domain::errors::DomainResult;
|
||||
use domain::ProviderConfigRow;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::config::ConfigSource;
|
||||
use crate::error::ApiError;
|
||||
use crate::extractors::AdminUser;
|
||||
use crate::state::AppState;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// DTOs
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ProviderConfigPayload {
|
||||
pub config_json: HashMap<String, String>,
|
||||
pub enabled: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ProviderConfigResponse {
|
||||
pub provider_type: String,
|
||||
pub config_json: HashMap<String, serde_json::Value>,
|
||||
pub enabled: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct TestResult {
|
||||
pub ok: bool,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Router
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub fn router() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/", get(list_providers))
|
||||
.route("/{type}", put(update_provider).delete(delete_provider))
|
||||
.route("/{type}/test", post(test_provider))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn mask_config(raw: &str) -> HashMap<String, serde_json::Value> {
|
||||
let parsed: HashMap<String, serde_json::Value> =
|
||||
serde_json::from_str(raw).unwrap_or_default();
|
||||
parsed
|
||||
.into_iter()
|
||||
.map(|(k, v)| {
|
||||
let secret_key = ["key", "password", "secret", "token"]
|
||||
.iter()
|
||||
.any(|kw| k.to_lowercase().contains(kw));
|
||||
let masked = if secret_key {
|
||||
match &v {
|
||||
serde_json::Value::String(s) if !s.is_empty() => {
|
||||
serde_json::Value::String("***".to_string())
|
||||
}
|
||||
_ => v,
|
||||
}
|
||||
} else {
|
||||
v
|
||||
};
|
||||
(k, masked)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn conflict_response() -> impl IntoResponse {
|
||||
(
|
||||
StatusCode::CONFLICT,
|
||||
Json(serde_json::json!({
|
||||
"error": "UI config disabled — set CONFIG_SOURCE=db on the server"
|
||||
})),
|
||||
)
|
||||
}
|
||||
|
||||
async fn rebuild_registry(state: &AppState) -> DomainResult<()> {
|
||||
let rows = state.provider_config_repo.get_all().await?;
|
||||
let mut new_registry = infra::ProviderRegistry::new();
|
||||
|
||||
for row in &rows {
|
||||
if !row.enabled {
|
||||
continue;
|
||||
}
|
||||
match row.provider_type.as_str() {
|
||||
#[cfg(feature = "jellyfin")]
|
||||
"jellyfin" => {
|
||||
if let Ok(cfg) =
|
||||
serde_json::from_str::<infra::JellyfinConfig>(&row.config_json)
|
||||
{
|
||||
new_registry.register(
|
||||
"jellyfin",
|
||||
Arc::new(infra::JellyfinMediaProvider::new(cfg)),
|
||||
);
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "local-files")]
|
||||
"local_files" => {
|
||||
let config: std::collections::HashMap<String, String> =
|
||||
match serde_json::from_str(&row.config_json) {
|
||||
Ok(c) => c,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
let files_dir = match config.get("files_dir") {
|
||||
Some(d) => std::path::PathBuf::from(d),
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let transcode_dir = config
|
||||
.get("transcode_dir")
|
||||
.filter(|s| !s.is_empty())
|
||||
.map(std::path::PathBuf::from);
|
||||
|
||||
let cleanup_ttl_hours: u32 = config
|
||||
.get("cleanup_ttl_hours")
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(24);
|
||||
|
||||
let base_url = state.config.base_url.clone();
|
||||
|
||||
match infra::factory::build_local_files_bundle(
|
||||
&state.db_pool,
|
||||
files_dir,
|
||||
transcode_dir,
|
||||
cleanup_ttl_hours,
|
||||
base_url,
|
||||
).await {
|
||||
Ok(bundle) => {
|
||||
let scan_idx = Arc::clone(&bundle.local_index);
|
||||
tokio::spawn(async move { scan_idx.rescan().await; });
|
||||
new_registry.register("local", bundle.provider);
|
||||
*state.local_index.write().await = Some(bundle.local_index);
|
||||
*state.transcode_manager.write().await = bundle.transcode_manager;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("local_files provider requires SQLite; skipping: {}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
if new_registry.is_empty() {
|
||||
new_registry.register("noop", Arc::new(NoopMediaProvider));
|
||||
}
|
||||
|
||||
*state.provider_registry.write().await = Arc::new(new_registry);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Handlers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub async fn list_providers(
|
||||
State(state): State<AppState>,
|
||||
AdminUser(_user): AdminUser,
|
||||
) -> Result<impl IntoResponse, ApiError> {
|
||||
let rows = state
|
||||
.provider_config_repo
|
||||
.get_all()
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
let response: Vec<ProviderConfigResponse> = rows
|
||||
.iter()
|
||||
.map(|row| ProviderConfigResponse {
|
||||
provider_type: row.provider_type.clone(),
|
||||
config_json: mask_config(&row.config_json),
|
||||
enabled: row.enabled,
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Json(response))
|
||||
}
|
||||
|
||||
pub async fn update_provider(
|
||||
State(state): State<AppState>,
|
||||
AdminUser(_user): AdminUser,
|
||||
Path(provider_type): Path<String>,
|
||||
Json(payload): Json<ProviderConfigPayload>,
|
||||
) -> Result<impl IntoResponse, ApiError> {
|
||||
if state.config.config_source != ConfigSource::Db {
|
||||
return Ok(conflict_response().into_response());
|
||||
}
|
||||
|
||||
let known = matches!(provider_type.as_str(), "jellyfin" | "local_files");
|
||||
if !known {
|
||||
return Err(ApiError::Validation(format!(
|
||||
"Unknown provider type: {}",
|
||||
provider_type
|
||||
)));
|
||||
}
|
||||
|
||||
let config_json = serde_json::to_string(&payload.config_json)
|
||||
.map_err(|e| ApiError::Internal(format!("Failed to serialize config: {}", e)))?;
|
||||
|
||||
let row = ProviderConfigRow {
|
||||
provider_type: provider_type.clone(),
|
||||
config_json: config_json.clone(),
|
||||
enabled: payload.enabled,
|
||||
updated_at: chrono::Utc::now().to_rfc3339(),
|
||||
};
|
||||
|
||||
state
|
||||
.provider_config_repo
|
||||
.upsert(&row)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
rebuild_registry(&state)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
let response = ProviderConfigResponse {
|
||||
provider_type,
|
||||
config_json: mask_config(&config_json),
|
||||
enabled: payload.enabled,
|
||||
};
|
||||
|
||||
Ok(Json(response).into_response())
|
||||
}
|
||||
|
||||
pub async fn delete_provider(
|
||||
State(state): State<AppState>,
|
||||
AdminUser(_user): AdminUser,
|
||||
Path(provider_type): Path<String>,
|
||||
) -> Result<impl IntoResponse, ApiError> {
|
||||
if state.config.config_source != ConfigSource::Db {
|
||||
return Ok(conflict_response().into_response());
|
||||
}
|
||||
|
||||
state
|
||||
.provider_config_repo
|
||||
.delete(&provider_type)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
rebuild_registry(&state)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
Ok(StatusCode::NO_CONTENT.into_response())
|
||||
}
|
||||
|
||||
pub async fn test_provider(
|
||||
State(_state): State<AppState>,
|
||||
AdminUser(_user): AdminUser,
|
||||
Path(provider_type): Path<String>,
|
||||
Json(payload): Json<ProviderConfigPayload>,
|
||||
) -> Result<impl IntoResponse, ApiError> {
|
||||
let result = match provider_type.as_str() {
|
||||
"jellyfin" => test_jellyfin(&payload.config_json).await,
|
||||
"local_files" => test_local_files(&payload.config_json),
|
||||
_ => TestResult {
|
||||
ok: false,
|
||||
message: "Unknown provider type".to_string(),
|
||||
},
|
||||
};
|
||||
|
||||
Ok(Json(result))
|
||||
}
|
||||
|
||||
async fn test_jellyfin(config: &HashMap<String, String>) -> TestResult {
|
||||
let base_url = match config.get("base_url") {
|
||||
Some(u) => u.trim_end_matches('/').to_string(),
|
||||
None => {
|
||||
return TestResult {
|
||||
ok: false,
|
||||
message: "Missing field: base_url".to_string(),
|
||||
}
|
||||
}
|
||||
};
|
||||
let api_key = match config.get("api_key") {
|
||||
Some(k) => k.clone(),
|
||||
None => {
|
||||
return TestResult {
|
||||
ok: false,
|
||||
message: "Missing field: api_key".to_string(),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let url = format!("{}/System/Info", base_url);
|
||||
let client = reqwest::Client::new();
|
||||
match client
|
||||
.get(&url)
|
||||
.header("X-Emby-Token", &api_key)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(resp) => {
|
||||
let status = resp.status();
|
||||
if status.is_success() {
|
||||
TestResult {
|
||||
ok: true,
|
||||
message: format!("Connected successfully (HTTP {})", status.as_u16()),
|
||||
}
|
||||
} else {
|
||||
TestResult {
|
||||
ok: false,
|
||||
message: format!("Jellyfin returned HTTP {}", status.as_u16()),
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => TestResult {
|
||||
ok: false,
|
||||
message: format!("Connection failed: {}", e),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn test_local_files(config: &HashMap<String, String>) -> TestResult {
|
||||
let path = match config.get("files_dir") {
|
||||
Some(p) => p.clone(),
|
||||
None => {
|
||||
return TestResult {
|
||||
ok: false,
|
||||
message: "Missing field: files_dir".to_string(),
|
||||
}
|
||||
}
|
||||
};
|
||||
let p = std::path::Path::new(&path);
|
||||
if p.exists() && p.is_dir() {
|
||||
TestResult {
|
||||
ok: true,
|
||||
message: format!("Directory exists: {}", path),
|
||||
}
|
||||
} else {
|
||||
TestResult {
|
||||
ok: false,
|
||||
message: format!("Path does not exist or is not a directory: {}", path),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// NoopMediaProvider (local copy — avoids pub-ing it from main.rs)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
struct NoopMediaProvider;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl domain::IMediaProvider for NoopMediaProvider {
|
||||
fn capabilities(&self) -> domain::ProviderCapabilities {
|
||||
domain::ProviderCapabilities {
|
||||
collections: false,
|
||||
series: false,
|
||||
genres: false,
|
||||
tags: false,
|
||||
decade: false,
|
||||
search: false,
|
||||
streaming_protocol: domain::StreamingProtocol::DirectFile,
|
||||
rescan: false,
|
||||
transcode: false,
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_items(
|
||||
&self,
|
||||
_: &domain::MediaFilter,
|
||||
) -> domain::DomainResult<Vec<domain::MediaItem>> {
|
||||
Err(domain::DomainError::InfrastructureError(
|
||||
"No media provider configured.".into(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn fetch_by_id(
|
||||
&self,
|
||||
_: &domain::MediaItemId,
|
||||
) -> domain::DomainResult<Option<domain::MediaItem>> {
|
||||
Err(domain::DomainError::InfrastructureError(
|
||||
"No media provider configured.".into(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn get_stream_url(
|
||||
&self,
|
||||
_: &domain::MediaItemId,
|
||||
_: &domain::StreamQuality,
|
||||
) -> domain::DomainResult<String> {
|
||||
Err(domain::DomainError::InfrastructureError(
|
||||
"No media provider configured.".into(),
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -86,6 +86,7 @@ pub(super) async fn me(CurrentUser(user): CurrentUser) -> Result<impl IntoRespon
|
||||
id: user.id,
|
||||
email: user.email.into_inner(),
|
||||
created_at: user.created_at,
|
||||
is_admin: user.is_admin,
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -9,21 +9,21 @@ pub fn router() -> Router<AppState> {
|
||||
}
|
||||
|
||||
async fn get_config(State(state): State<AppState>) -> Json<ConfigResponse> {
|
||||
let providers: Vec<ProviderInfo> = state
|
||||
.provider_registry
|
||||
let registry = state.provider_registry.read().await;
|
||||
|
||||
let providers: Vec<ProviderInfo> = registry
|
||||
.provider_ids()
|
||||
.into_iter()
|
||||
.filter_map(|id| {
|
||||
state.provider_registry.capabilities(&id).map(|caps| ProviderInfo {
|
||||
registry.capabilities(&id).map(|caps| ProviderInfo {
|
||||
id: id.clone(),
|
||||
capabilities: caps,
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
let primary_capabilities = state
|
||||
.provider_registry
|
||||
.capabilities(state.provider_registry.primary_id())
|
||||
let primary_capabilities = registry
|
||||
.capabilities(registry.primary_id())
|
||||
.unwrap_or(ProviderCapabilities {
|
||||
collections: false,
|
||||
series: false,
|
||||
@@ -36,9 +36,16 @@ async fn get_config(State(state): State<AppState>) -> Json<ConfigResponse> {
|
||||
transcode: false,
|
||||
});
|
||||
|
||||
let mut available_provider_types = Vec::new();
|
||||
#[cfg(feature = "jellyfin")]
|
||||
available_provider_types.push("jellyfin".to_string());
|
||||
#[cfg(feature = "local-files")]
|
||||
available_provider_types.push("local_files".to_string());
|
||||
|
||||
Json(ConfigResponse {
|
||||
allow_registration: state.config.allow_registration,
|
||||
providers,
|
||||
provider_capabilities: primary_capabilities,
|
||||
available_provider_types,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -147,9 +147,7 @@ async fn trigger_rescan(
|
||||
State(state): State<AppState>,
|
||||
CurrentUser(_user): CurrentUser,
|
||||
) -> Result<Json<serde_json::Value>, ApiError> {
|
||||
let index = state
|
||||
.local_index
|
||||
.as_ref()
|
||||
let index = state.local_index.read().await.clone()
|
||||
.ok_or_else(|| ApiError::not_implemented("no local files provider active"))?;
|
||||
let count = index.rescan().await;
|
||||
Ok(Json(serde_json::json!({ "items_found": count })))
|
||||
@@ -164,9 +162,7 @@ async fn transcode_playlist(
|
||||
State(state): State<AppState>,
|
||||
Path(id): Path<String>,
|
||||
) -> Result<Response, ApiError> {
|
||||
let tm = state
|
||||
.transcode_manager
|
||||
.as_ref()
|
||||
let tm = state.transcode_manager.read().await.clone()
|
||||
.ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
|
||||
|
||||
let root = state.config.local_files_dir.as_ref().ok_or_else(|| {
|
||||
@@ -219,9 +215,7 @@ async fn transcode_segment(
|
||||
return Err(ApiError::Forbidden("invalid segment path".into()));
|
||||
}
|
||||
|
||||
let tm = state
|
||||
.transcode_manager
|
||||
.as_ref()
|
||||
let tm = state.transcode_manager.read().await.clone()
|
||||
.ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
|
||||
|
||||
let file_path = tm.transcode_dir.join(&id).join(&segment);
|
||||
@@ -262,19 +256,10 @@ async fn get_transcode_settings(
|
||||
State(state): State<AppState>,
|
||||
CurrentUser(_user): CurrentUser,
|
||||
) -> Result<Json<TranscodeSettingsResponse>, ApiError> {
|
||||
let pool = state
|
||||
.sqlite_pool
|
||||
.as_ref()
|
||||
.ok_or_else(|| ApiError::not_implemented("sqlite not available"))?;
|
||||
|
||||
let (ttl,): (i64,) =
|
||||
sqlx::query_as("SELECT cleanup_ttl_hours FROM transcode_settings WHERE id = 1")
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.map_err(|e| ApiError::internal(e.to_string()))?;
|
||||
|
||||
let tm = state.transcode_manager.read().await.clone()
|
||||
.ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
|
||||
Ok(Json(TranscodeSettingsResponse {
|
||||
cleanup_ttl_hours: ttl as u32,
|
||||
cleanup_ttl_hours: tm.get_cleanup_ttl(),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -284,19 +269,14 @@ async fn update_transcode_settings(
|
||||
CurrentUser(_user): CurrentUser,
|
||||
Json(req): Json<UpdateTranscodeSettingsRequest>,
|
||||
) -> Result<Json<TranscodeSettingsResponse>, ApiError> {
|
||||
let pool = state
|
||||
.sqlite_pool
|
||||
.as_ref()
|
||||
.ok_or_else(|| ApiError::not_implemented("sqlite not available"))?;
|
||||
|
||||
let ttl = req.cleanup_ttl_hours as i64;
|
||||
sqlx::query("UPDATE transcode_settings SET cleanup_ttl_hours = ? WHERE id = 1")
|
||||
.bind(ttl)
|
||||
.execute(pool)
|
||||
if let Some(repo) = &state.transcode_settings_repo {
|
||||
repo.save_cleanup_ttl(req.cleanup_ttl_hours)
|
||||
.await
|
||||
.map_err(|e| ApiError::internal(e.to_string()))?;
|
||||
}
|
||||
|
||||
if let Some(tm) = &state.transcode_manager {
|
||||
let tm_opt = state.transcode_manager.read().await.clone();
|
||||
if let Some(tm) = tm_opt {
|
||||
tm.set_cleanup_ttl(req.cleanup_ttl_hours);
|
||||
}
|
||||
|
||||
@@ -310,9 +290,7 @@ async fn get_transcode_stats(
|
||||
State(state): State<AppState>,
|
||||
CurrentUser(_user): CurrentUser,
|
||||
) -> Result<Json<TranscodeStatsResponse>, ApiError> {
|
||||
let tm = state
|
||||
.transcode_manager
|
||||
.as_ref()
|
||||
let tm = state.transcode_manager.read().await.clone()
|
||||
.ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
|
||||
let (cache_size_bytes, item_count) = tm.cache_stats().await;
|
||||
Ok(Json(TranscodeStatsResponse {
|
||||
@@ -326,9 +304,7 @@ async fn clear_transcode_cache(
|
||||
State(state): State<AppState>,
|
||||
CurrentUser(_user): CurrentUser,
|
||||
) -> Result<StatusCode, ApiError> {
|
||||
let tm = state
|
||||
.transcode_manager
|
||||
.as_ref()
|
||||
let tm = state.transcode_manager.read().await.clone()
|
||||
.ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
|
||||
tm.clear_cache()
|
||||
.await
|
||||
|
||||
@@ -151,13 +151,14 @@ async fn list_collections(
|
||||
Query(params): Query<CollectionsQuery>,
|
||||
) -> Result<Json<Vec<CollectionResponse>>, ApiError> {
|
||||
let provider_id = params.provider.as_deref().unwrap_or("");
|
||||
let caps = state.provider_registry.capabilities(provider_id).ok_or_else(|| {
|
||||
let registry = state.provider_registry.read().await;
|
||||
let caps = registry.capabilities(provider_id).ok_or_else(|| {
|
||||
ApiError::validation(format!("Unknown provider '{}'", provider_id))
|
||||
})?;
|
||||
if !caps.collections {
|
||||
return Err(ApiError::not_implemented("collections not supported by this provider"));
|
||||
}
|
||||
let collections = state.provider_registry.list_collections(provider_id).await?;
|
||||
let collections = registry.list_collections(provider_id).await?;
|
||||
Ok(Json(collections.into_iter().map(Into::into).collect()))
|
||||
}
|
||||
|
||||
@@ -168,14 +169,14 @@ async fn list_series(
|
||||
Query(params): Query<SeriesQuery>,
|
||||
) -> Result<Json<Vec<SeriesResponse>>, ApiError> {
|
||||
let provider_id = params.provider.as_deref().unwrap_or("");
|
||||
let caps = state.provider_registry.capabilities(provider_id).ok_or_else(|| {
|
||||
let registry = state.provider_registry.read().await;
|
||||
let caps = registry.capabilities(provider_id).ok_or_else(|| {
|
||||
ApiError::validation(format!("Unknown provider '{}'", provider_id))
|
||||
})?;
|
||||
if !caps.series {
|
||||
return Err(ApiError::not_implemented("series not supported by this provider"));
|
||||
}
|
||||
let series = state
|
||||
.provider_registry
|
||||
let series = registry
|
||||
.list_series(provider_id, params.collection.as_deref())
|
||||
.await?;
|
||||
Ok(Json(series.into_iter().map(Into::into).collect()))
|
||||
@@ -188,14 +189,15 @@ async fn list_genres(
|
||||
Query(params): Query<GenresQuery>,
|
||||
) -> Result<Json<Vec<String>>, ApiError> {
|
||||
let provider_id = params.provider.as_deref().unwrap_or("");
|
||||
let caps = state.provider_registry.capabilities(provider_id).ok_or_else(|| {
|
||||
let registry = state.provider_registry.read().await;
|
||||
let caps = registry.capabilities(provider_id).ok_or_else(|| {
|
||||
ApiError::validation(format!("Unknown provider '{}'", provider_id))
|
||||
})?;
|
||||
if !caps.genres {
|
||||
return Err(ApiError::not_implemented("genres not supported by this provider"));
|
||||
}
|
||||
let ct = parse_content_type(params.content_type.as_deref())?;
|
||||
let genres = state.provider_registry.list_genres(provider_id, ct.as_ref()).await?;
|
||||
let genres = registry.list_genres(provider_id, ct.as_ref()).await?;
|
||||
Ok(Json(genres))
|
||||
}
|
||||
|
||||
@@ -228,7 +230,8 @@ async fn search_items(
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut items = state.provider_registry.fetch_items(provider_id, &filter).await?;
|
||||
let registry = state.provider_registry.read().await;
|
||||
let mut items = registry.fetch_items(provider_id, &filter).await?;
|
||||
|
||||
// Apply the same ordering the schedule engine uses so the preview reflects
|
||||
// what will actually be scheduled rather than raw provider order.
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::state::AppState;
|
||||
use axum::Router;
|
||||
|
||||
pub mod admin;
|
||||
pub mod admin_providers;
|
||||
pub mod auth;
|
||||
pub mod channels;
|
||||
pub mod config;
|
||||
@@ -17,6 +18,7 @@ pub mod library;
|
||||
pub fn api_v1_router() -> Router<AppState> {
|
||||
Router::new()
|
||||
.nest("/admin", admin::router())
|
||||
.nest("/admin/providers", admin_providers::router())
|
||||
.nest("/auth", auth::router())
|
||||
.nest("/channels", channels::router())
|
||||
.nest("/config", config::router())
|
||||
|
||||
@@ -15,14 +15,18 @@ use tokio::sync::broadcast;
|
||||
use crate::config::Config;
|
||||
use crate::events::EventBus;
|
||||
use crate::log_layer::LogLine;
|
||||
use domain::{ActivityLogRepository, ChannelService, ScheduleEngineService, UserService};
|
||||
use domain::{ActivityLogRepository, ChannelService, ProviderConfigRepository, ScheduleEngineService, UserService};
|
||||
#[cfg(feature = "local-files")]
|
||||
use domain::TranscodeSettingsRepository;
|
||||
use k_core::db::DatabasePool;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub user_service: Arc<UserService>,
|
||||
pub channel_service: Arc<ChannelService>,
|
||||
pub schedule_engine: Arc<ScheduleEngineService>,
|
||||
pub provider_registry: Arc<infra::ProviderRegistry>,
|
||||
pub provider_registry: Arc<tokio::sync::RwLock<Arc<infra::ProviderRegistry>>>,
|
||||
pub provider_config_repo: Arc<dyn ProviderConfigRepository>,
|
||||
pub cookie_key: Key,
|
||||
#[cfg(feature = "auth-oidc")]
|
||||
pub oidc_service: Option<Arc<OidcService>>,
|
||||
@@ -38,13 +42,15 @@ pub struct AppState {
|
||||
pub activity_log_repo: Arc<dyn ActivityLogRepository>,
|
||||
/// Index for the local-files provider, used by the rescan route.
|
||||
#[cfg(feature = "local-files")]
|
||||
pub local_index: Option<Arc<infra::LocalIndex>>,
|
||||
pub local_index: Arc<tokio::sync::RwLock<Option<Arc<infra::LocalIndex>>>>,
|
||||
/// TranscodeManager for FFmpeg HLS transcoding (requires TRANSCODE_DIR).
|
||||
#[cfg(feature = "local-files")]
|
||||
pub transcode_manager: Option<Arc<infra::TranscodeManager>>,
|
||||
/// SQLite pool for transcode settings CRUD.
|
||||
pub transcode_manager: Arc<tokio::sync::RwLock<Option<Arc<infra::TranscodeManager>>>>,
|
||||
/// Repository for transcode settings persistence.
|
||||
#[cfg(feature = "local-files")]
|
||||
pub sqlite_pool: Option<sqlx::SqlitePool>,
|
||||
pub transcode_settings_repo: Option<Arc<dyn TranscodeSettingsRepository>>,
|
||||
/// Database pool — used by infra factory functions for hot-reload.
|
||||
pub db_pool: Arc<DatabasePool>,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
@@ -52,12 +58,16 @@ impl AppState {
|
||||
user_service: UserService,
|
||||
channel_service: ChannelService,
|
||||
schedule_engine: ScheduleEngineService,
|
||||
provider_registry: Arc<infra::ProviderRegistry>,
|
||||
provider_registry: Arc<tokio::sync::RwLock<Arc<infra::ProviderRegistry>>>,
|
||||
provider_config_repo: Arc<dyn ProviderConfigRepository>,
|
||||
config: Config,
|
||||
event_tx: EventBus,
|
||||
log_tx: broadcast::Sender<LogLine>,
|
||||
log_history: Arc<Mutex<VecDeque<LogLine>>>,
|
||||
activity_log_repo: Arc<dyn ActivityLogRepository>,
|
||||
db_pool: Arc<DatabasePool>,
|
||||
#[cfg(feature = "local-files")]
|
||||
transcode_settings_repo: Option<Arc<dyn TranscodeSettingsRepository>>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let cookie_key = Key::derive_from(config.cookie_secret.as_bytes());
|
||||
|
||||
@@ -123,6 +133,7 @@ impl AppState {
|
||||
channel_service: Arc::new(channel_service),
|
||||
schedule_engine: Arc::new(schedule_engine),
|
||||
provider_registry,
|
||||
provider_config_repo,
|
||||
cookie_key,
|
||||
#[cfg(feature = "auth-oidc")]
|
||||
oidc_service,
|
||||
@@ -134,11 +145,12 @@ impl AppState {
|
||||
log_history,
|
||||
activity_log_repo,
|
||||
#[cfg(feature = "local-files")]
|
||||
local_index: None,
|
||||
local_index: Arc::new(tokio::sync::RwLock::new(None)),
|
||||
#[cfg(feature = "local-files")]
|
||||
transcode_manager: None,
|
||||
transcode_manager: Arc::new(tokio::sync::RwLock::new(None)),
|
||||
#[cfg(feature = "local-files")]
|
||||
sqlite_pool: None,
|
||||
transcode_settings_repo,
|
||||
db_pool,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ pub struct User {
|
||||
pub subject: String,
|
||||
pub email: Email,
|
||||
pub password_hash: Option<String>,
|
||||
pub is_admin: bool,
|
||||
pub created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
@@ -32,6 +33,7 @@ impl User {
|
||||
subject: subject.into(),
|
||||
email,
|
||||
password_hash: None,
|
||||
is_admin: false,
|
||||
created_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
@@ -41,6 +43,7 @@ impl User {
|
||||
subject: impl Into<String>,
|
||||
email: Email,
|
||||
password_hash: Option<String>,
|
||||
is_admin: bool,
|
||||
created_at: DateTime<Utc>,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -48,6 +51,7 @@ impl User {
|
||||
subject: subject.into(),
|
||||
email,
|
||||
password_hash,
|
||||
is_admin,
|
||||
created_at,
|
||||
}
|
||||
}
|
||||
@@ -58,6 +62,7 @@ impl User {
|
||||
subject: format!("local|{}", Uuid::new_v4()),
|
||||
email,
|
||||
password_hash: Some(password_hash.into()),
|
||||
is_admin: false,
|
||||
created_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,6 +39,24 @@ pub trait UserRepository: Send + Sync {
|
||||
|
||||
/// Delete a user by their ID
|
||||
async fn delete(&self, id: Uuid) -> DomainResult<()>;
|
||||
|
||||
/// Count total number of users (used for first-user admin promotion)
|
||||
async fn count_users(&self) -> DomainResult<u64>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ProviderConfigRow {
|
||||
pub provider_type: String,
|
||||
pub config_json: String,
|
||||
pub enabled: bool,
|
||||
pub updated_at: String,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ProviderConfigRepository: Send + Sync {
|
||||
async fn get_all(&self) -> DomainResult<Vec<ProviderConfigRow>>;
|
||||
async fn upsert(&self, row: &ProviderConfigRow) -> DomainResult<()>;
|
||||
async fn delete(&self, provider_type: &str) -> DomainResult<()>;
|
||||
}
|
||||
|
||||
/// Repository port for `Channel` persistence.
|
||||
@@ -93,3 +111,12 @@ pub trait ActivityLogRepository: Send + Sync {
|
||||
) -> DomainResult<()>;
|
||||
async fn recent(&self, limit: u32) -> DomainResult<Vec<ActivityEvent>>;
|
||||
}
|
||||
|
||||
/// Repository port for transcode settings persistence.
|
||||
#[async_trait]
|
||||
pub trait TranscodeSettingsRepository: Send + Sync {
|
||||
/// Load the persisted cleanup TTL. Returns None if no row exists yet.
|
||||
async fn load_cleanup_ttl(&self) -> DomainResult<Option<u32>>;
|
||||
/// Persist the cleanup TTL (upsert — always row id=1).
|
||||
async fn save_cleanup_ttl(&self, hours: u32) -> DomainResult<()>;
|
||||
}
|
||||
|
||||
@@ -31,7 +31,10 @@ impl UserService {
|
||||
}
|
||||
|
||||
let email = Email::try_from(email)?;
|
||||
let user = User::new(subject, email);
|
||||
let mut user = User::new(subject, email);
|
||||
if self.user_repository.count_users().await? == 0 {
|
||||
user.is_admin = true;
|
||||
}
|
||||
self.user_repository.save(&user).await?;
|
||||
Ok(user)
|
||||
}
|
||||
@@ -53,7 +56,10 @@ impl UserService {
|
||||
password_hash: &str,
|
||||
) -> DomainResult<User> {
|
||||
let email = Email::try_from(email)?;
|
||||
let user = User::new_local(email, password_hash);
|
||||
let mut user = User::new_local(email, password_hash);
|
||||
if self.user_repository.count_users().await? == 0 {
|
||||
user.is_admin = true;
|
||||
}
|
||||
self.user_repository.save(&user).await?;
|
||||
Ok(user)
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::db::DatabasePool;
|
||||
use domain::{ActivityLogRepository, ChannelRepository, ScheduleRepository, UserRepository};
|
||||
use domain::{ActivityLogRepository, ChannelRepository, ProviderConfigRepository, ScheduleRepository, TranscodeSettingsRepository, UserRepository};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum FactoryError {
|
||||
@@ -70,6 +70,21 @@ pub async fn build_activity_log_repository(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn build_provider_config_repository(
|
||||
pool: &DatabasePool,
|
||||
) -> FactoryResult<Arc<dyn ProviderConfigRepository>> {
|
||||
match pool {
|
||||
#[cfg(feature = "sqlite")]
|
||||
DatabasePool::Sqlite(pool) => Ok(Arc::new(
|
||||
crate::provider_config_repository::SqliteProviderConfigRepository::new(pool.clone()),
|
||||
)),
|
||||
#[allow(unreachable_patterns)]
|
||||
_ => Err(FactoryError::NotImplemented(
|
||||
"ProviderConfigRepository not implemented for this database".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn build_schedule_repository(
|
||||
pool: &DatabasePool,
|
||||
) -> FactoryResult<Arc<dyn ScheduleRepository>> {
|
||||
@@ -88,3 +103,57 @@ pub async fn build_schedule_repository(
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn build_transcode_settings_repository(
|
||||
pool: &DatabasePool,
|
||||
) -> FactoryResult<Arc<dyn TranscodeSettingsRepository>> {
|
||||
match pool {
|
||||
#[cfg(feature = "sqlite")]
|
||||
DatabasePool::Sqlite(p) => Ok(Arc::new(
|
||||
crate::transcode_settings_repository::SqliteTranscodeSettingsRepository::new(p.clone()),
|
||||
)),
|
||||
#[allow(unreachable_patterns)]
|
||||
_ => Err(FactoryError::NotImplemented(
|
||||
"TranscodeSettingsRepository not implemented for this database".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "local-files")]
|
||||
pub struct LocalFilesBundle {
|
||||
pub provider: Arc<crate::LocalFilesProvider>,
|
||||
pub local_index: Arc<crate::LocalIndex>,
|
||||
pub transcode_manager: Option<Arc<crate::TranscodeManager>>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "local-files")]
|
||||
pub async fn build_local_files_bundle(
|
||||
pool: &DatabasePool,
|
||||
root_dir: std::path::PathBuf,
|
||||
transcode_dir: Option<std::path::PathBuf>,
|
||||
cleanup_ttl_hours: u32,
|
||||
base_url: String,
|
||||
) -> FactoryResult<LocalFilesBundle> {
|
||||
match pool {
|
||||
#[cfg(feature = "sqlite")]
|
||||
DatabasePool::Sqlite(sqlite_pool) => {
|
||||
let cfg = crate::LocalFilesConfig {
|
||||
root_dir,
|
||||
base_url,
|
||||
transcode_dir: transcode_dir.clone(),
|
||||
cleanup_ttl_hours,
|
||||
};
|
||||
let idx = Arc::new(crate::LocalIndex::new(&cfg, sqlite_pool.clone()).await);
|
||||
let tm = transcode_dir.as_ref().map(|td| {
|
||||
std::fs::create_dir_all(td).ok();
|
||||
crate::TranscodeManager::new(td.clone(), cleanup_ttl_hours)
|
||||
});
|
||||
let provider = Arc::new(crate::LocalFilesProvider::new(Arc::clone(&idx), cfg, tm.clone()));
|
||||
Ok(LocalFilesBundle { provider, local_index: idx, transcode_manager: tm })
|
||||
}
|
||||
#[allow(unreachable_patterns)]
|
||||
_ => Err(FactoryError::NotImplemented(
|
||||
"local-files requires SQLite".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// Connection details for a single Jellyfin instance.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct JellyfinConfig {
|
||||
/// e.g. `"http://192.168.1.10:8096"` — no trailing slash
|
||||
pub base_url: String,
|
||||
|
||||
@@ -20,7 +20,9 @@ pub mod jellyfin;
|
||||
pub mod provider_registry;
|
||||
mod activity_log_repository;
|
||||
mod channel_repository;
|
||||
mod provider_config_repository;
|
||||
mod schedule_repository;
|
||||
mod transcode_settings_repository;
|
||||
mod user_repository;
|
||||
|
||||
#[cfg(feature = "local-files")]
|
||||
@@ -37,7 +39,13 @@ pub use user_repository::SqliteUserRepository;
|
||||
#[cfg(feature = "sqlite")]
|
||||
pub use channel_repository::SqliteChannelRepository;
|
||||
#[cfg(feature = "sqlite")]
|
||||
pub use provider_config_repository::SqliteProviderConfigRepository;
|
||||
#[cfg(feature = "sqlite")]
|
||||
pub use schedule_repository::SqliteScheduleRepository;
|
||||
#[cfg(feature = "sqlite")]
|
||||
pub use transcode_settings_repository::SqliteTranscodeSettingsRepository;
|
||||
|
||||
pub use domain::TranscodeSettingsRepository;
|
||||
|
||||
#[cfg(feature = "jellyfin")]
|
||||
pub use jellyfin::{JellyfinConfig, JellyfinMediaProvider};
|
||||
|
||||
4
k-tv-backend/infra/src/provider_config_repository/mod.rs
Normal file
4
k-tv-backend/infra/src/provider_config_repository/mod.rs
Normal file
@@ -0,0 +1,4 @@
|
||||
#[cfg(feature = "sqlite")]
|
||||
mod sqlite;
|
||||
#[cfg(feature = "sqlite")]
|
||||
pub use sqlite::SqliteProviderConfigRepository;
|
||||
63
k-tv-backend/infra/src/provider_config_repository/sqlite.rs
Normal file
63
k-tv-backend/infra/src/provider_config_repository/sqlite.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
use async_trait::async_trait;
|
||||
use domain::{DomainError, DomainResult, ProviderConfigRepository, ProviderConfigRow};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SqliteProviderConfigRepository {
|
||||
pool: sqlx::SqlitePool,
|
||||
}
|
||||
|
||||
impl SqliteProviderConfigRepository {
|
||||
pub fn new(pool: sqlx::SqlitePool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ProviderConfigRepository for SqliteProviderConfigRepository {
|
||||
async fn get_all(&self) -> DomainResult<Vec<ProviderConfigRow>> {
|
||||
let rows: Vec<(String, String, i64, String)> = sqlx::query_as(
|
||||
"SELECT provider_type, config_json, enabled, updated_at FROM provider_configs",
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
Ok(rows
|
||||
.into_iter()
|
||||
.map(|(provider_type, config_json, enabled, updated_at)| ProviderConfigRow {
|
||||
provider_type,
|
||||
config_json,
|
||||
enabled: enabled != 0,
|
||||
updated_at,
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn upsert(&self, row: &ProviderConfigRow) -> DomainResult<()> {
|
||||
sqlx::query(
|
||||
r#"INSERT INTO provider_configs (provider_type, config_json, enabled, updated_at)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(provider_type) DO UPDATE SET
|
||||
config_json = excluded.config_json,
|
||||
enabled = excluded.enabled,
|
||||
updated_at = excluded.updated_at"#,
|
||||
)
|
||||
.bind(&row.provider_type)
|
||||
.bind(&row.config_json)
|
||||
.bind(row.enabled as i64)
|
||||
.bind(&row.updated_at)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, provider_type: &str) -> DomainResult<()> {
|
||||
sqlx::query("DELETE FROM provider_configs WHERE provider_type = ?")
|
||||
.bind(provider_type)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
#[cfg(feature = "sqlite")]
|
||||
mod sqlite;
|
||||
#[cfg(feature = "sqlite")]
|
||||
pub use sqlite::SqliteTranscodeSettingsRepository;
|
||||
@@ -0,0 +1,34 @@
|
||||
use async_trait::async_trait;
|
||||
use domain::{DomainError, DomainResult, TranscodeSettingsRepository};
|
||||
use sqlx::SqlitePool;
|
||||
|
||||
pub struct SqliteTranscodeSettingsRepository {
|
||||
pool: SqlitePool,
|
||||
}
|
||||
|
||||
impl SqliteTranscodeSettingsRepository {
|
||||
pub fn new(pool: SqlitePool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TranscodeSettingsRepository for SqliteTranscodeSettingsRepository {
|
||||
async fn load_cleanup_ttl(&self) -> DomainResult<Option<u32>> {
|
||||
let row: Option<(i64,)> =
|
||||
sqlx::query_as("SELECT cleanup_ttl_hours FROM transcode_settings WHERE id = 1")
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||
Ok(row.map(|(h,)| h as u32))
|
||||
}
|
||||
|
||||
async fn save_cleanup_ttl(&self, hours: u32) -> DomainResult<()> {
|
||||
sqlx::query("UPDATE transcode_settings SET cleanup_ttl_hours = ? WHERE id = 1")
|
||||
.bind(hours as i64)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ pub(super) struct UserRow {
|
||||
pub subject: String,
|
||||
pub email: String,
|
||||
pub password_hash: Option<String>,
|
||||
pub is_admin: i64,
|
||||
pub created_at: String,
|
||||
}
|
||||
|
||||
@@ -36,6 +37,7 @@ impl TryFrom<UserRow> for User {
|
||||
row.subject,
|
||||
email,
|
||||
row.password_hash,
|
||||
row.is_admin != 0,
|
||||
created_at,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ impl UserRepository for PostgresUserRepository {
|
||||
async fn find_by_id(&self, id: Uuid) -> DomainResult<Option<User>> {
|
||||
let id_str = id.to_string();
|
||||
let row: Option<UserRow> = sqlx::query_as(
|
||||
"SELECT id, subject, email, password_hash, created_at FROM users WHERE id = $1",
|
||||
"SELECT id, subject, email, password_hash, is_admin, created_at FROM users WHERE id = $1",
|
||||
)
|
||||
.bind(&id_str)
|
||||
.fetch_optional(&self.pool)
|
||||
@@ -34,7 +34,7 @@ impl UserRepository for PostgresUserRepository {
|
||||
|
||||
async fn find_by_subject(&self, subject: &str) -> DomainResult<Option<User>> {
|
||||
let row: Option<UserRow> = sqlx::query_as(
|
||||
"SELECT id, subject, email, password_hash, created_at FROM users WHERE subject = $1",
|
||||
"SELECT id, subject, email, password_hash, is_admin, created_at FROM users WHERE subject = $1",
|
||||
)
|
||||
.bind(subject)
|
||||
.fetch_optional(&self.pool)
|
||||
@@ -46,7 +46,7 @@ impl UserRepository for PostgresUserRepository {
|
||||
|
||||
async fn find_by_email(&self, email: &str) -> DomainResult<Option<User>> {
|
||||
let row: Option<UserRow> = sqlx::query_as(
|
||||
"SELECT id, subject, email, password_hash, created_at FROM users WHERE email = $1",
|
||||
"SELECT id, subject, email, password_hash, is_admin, created_at FROM users WHERE email = $1",
|
||||
)
|
||||
.bind(email)
|
||||
.fetch_optional(&self.pool)
|
||||
@@ -62,18 +62,20 @@ impl UserRepository for PostgresUserRepository {
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO users (id, subject, email, password_hash, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
INSERT INTO users (id, subject, email, password_hash, is_admin, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
subject = excluded.subject,
|
||||
email = excluded.email,
|
||||
password_hash = excluded.password_hash
|
||||
password_hash = excluded.password_hash,
|
||||
is_admin = excluded.is_admin
|
||||
"#,
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(&user.subject)
|
||||
.bind(user.email.as_ref())
|
||||
.bind(&user.password_hash)
|
||||
.bind(user.is_admin)
|
||||
.bind(&created_at)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
@@ -99,4 +101,12 @@ impl UserRepository for PostgresUserRepository {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn count_users(&self) -> DomainResult<u64> {
|
||||
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users")
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
Ok(count as u64)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ impl UserRepository for SqliteUserRepository {
|
||||
async fn find_by_id(&self, id: Uuid) -> DomainResult<Option<User>> {
|
||||
let id_str = id.to_string();
|
||||
let row: Option<UserRow> = sqlx::query_as(
|
||||
"SELECT id, subject, email, password_hash, created_at FROM users WHERE id = ?",
|
||||
"SELECT id, subject, email, password_hash, is_admin, created_at FROM users WHERE id = ?",
|
||||
)
|
||||
.bind(&id_str)
|
||||
.fetch_optional(&self.pool)
|
||||
@@ -34,7 +34,7 @@ impl UserRepository for SqliteUserRepository {
|
||||
|
||||
async fn find_by_subject(&self, subject: &str) -> DomainResult<Option<User>> {
|
||||
let row: Option<UserRow> = sqlx::query_as(
|
||||
"SELECT id, subject, email, password_hash, created_at FROM users WHERE subject = ?",
|
||||
"SELECT id, subject, email, password_hash, is_admin, created_at FROM users WHERE subject = ?",
|
||||
)
|
||||
.bind(subject)
|
||||
.fetch_optional(&self.pool)
|
||||
@@ -46,7 +46,7 @@ impl UserRepository for SqliteUserRepository {
|
||||
|
||||
async fn find_by_email(&self, email: &str) -> DomainResult<Option<User>> {
|
||||
let row: Option<UserRow> = sqlx::query_as(
|
||||
"SELECT id, subject, email, password_hash, created_at FROM users WHERE email = ?",
|
||||
"SELECT id, subject, email, password_hash, is_admin, created_at FROM users WHERE email = ?",
|
||||
)
|
||||
.bind(email)
|
||||
.fetch_optional(&self.pool)
|
||||
@@ -62,18 +62,20 @@ impl UserRepository for SqliteUserRepository {
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO users (id, subject, email, password_hash, created_at)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
INSERT INTO users (id, subject, email, password_hash, is_admin, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
subject = excluded.subject,
|
||||
email = excluded.email,
|
||||
password_hash = excluded.password_hash
|
||||
password_hash = excluded.password_hash,
|
||||
is_admin = excluded.is_admin
|
||||
"#,
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(&user.subject)
|
||||
.bind(user.email.as_ref())
|
||||
.bind(&user.password_hash)
|
||||
.bind(user.is_admin as i64)
|
||||
.bind(&created_at)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
@@ -100,6 +102,14 @@ impl UserRepository for SqliteUserRepository {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn count_users(&self) -> DomainResult<u64> {
|
||||
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users")
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
Ok(count as u64)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
ALTER TABLE users ADD COLUMN is_admin INTEGER NOT NULL DEFAULT 0;
|
||||
|
||||
CREATE TABLE provider_configs (
|
||||
provider_type TEXT PRIMARY KEY,
|
||||
config_json TEXT NOT NULL,
|
||||
enabled INTEGER NOT NULL DEFAULT 1,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
@@ -0,0 +1,180 @@
|
||||
"use client";
|
||||
|
||||
import { useState } from "react";
|
||||
import { useProviderConfigs, useUpdateProvider, useTestProvider } from "@/hooks/use-admin-providers";
|
||||
import { useConfig } from "@/hooks/use-config";
|
||||
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import { Input } from "@/components/ui/input";
|
||||
import { Label } from "@/components/ui/label";
|
||||
import { Switch } from "@/components/ui/switch";
|
||||
import { CheckCircle, XCircle, Loader2 } from "lucide-react";
|
||||
import { ApiRequestError } from "@/lib/api";
|
||||
|
||||
const PROVIDER_FIELDS: Record<
|
||||
string,
|
||||
Array<{ key: string; label: string; type?: string; required?: boolean }>
|
||||
> = {
|
||||
jellyfin: [
|
||||
{ key: "base_url", label: "Base URL", required: true },
|
||||
{ key: "api_key", label: "API Key", type: "password", required: true },
|
||||
{ key: "user_id", label: "User ID", required: true },
|
||||
],
|
||||
local_files: [
|
||||
{ key: "files_dir", label: "Files Directory", required: true },
|
||||
{ key: "transcode_dir", label: "Transcode Directory" },
|
||||
{ key: "cleanup_ttl_hours", label: "Cleanup TTL Hours" },
|
||||
],
|
||||
};
|
||||
|
||||
interface ProviderCardProps {
|
||||
providerType: string;
|
||||
existingConfig?: { config_json: Record<string, string>; enabled: boolean };
|
||||
}
|
||||
|
||||
function ProviderCard({ providerType, existingConfig }: ProviderCardProps) {
|
||||
const fields = PROVIDER_FIELDS[providerType] ?? [];
|
||||
const [formValues, setFormValues] = useState<Record<string, string>>(
|
||||
() => existingConfig?.config_json ?? {},
|
||||
);
|
||||
const [enabled, setEnabled] = useState(existingConfig?.enabled ?? true);
|
||||
const [conflictError, setConflictError] = useState(false);
|
||||
const [testResult, setTestResult] = useState<{ ok: boolean; message: string } | null>(null);
|
||||
|
||||
const updateProvider = useUpdateProvider();
|
||||
const testProvider = useTestProvider();
|
||||
|
||||
const handleSave = async () => {
|
||||
setConflictError(false);
|
||||
try {
|
||||
await updateProvider.mutateAsync({
|
||||
type: providerType,
|
||||
payload: { config_json: formValues, enabled },
|
||||
});
|
||||
} catch (e: unknown) {
|
||||
if (e instanceof ApiRequestError && e.status === 409) {
|
||||
setConflictError(true);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const handleTest = async () => {
|
||||
setTestResult(null);
|
||||
const result = await testProvider.mutateAsync({
|
||||
type: providerType,
|
||||
payload: { config_json: formValues, enabled: true },
|
||||
});
|
||||
setTestResult(result);
|
||||
};
|
||||
|
||||
return (
|
||||
<Card className="border-zinc-800 bg-zinc-900">
|
||||
<CardHeader className="flex flex-row items-center justify-between pb-3">
|
||||
<CardTitle className="text-sm font-medium capitalize text-zinc-100">
|
||||
{providerType.replace("_", " ")}
|
||||
</CardTitle>
|
||||
<div className="flex items-center gap-2">
|
||||
<span className="text-xs text-zinc-400">Enabled</span>
|
||||
<Switch checked={enabled} onCheckedChange={setEnabled} />
|
||||
</div>
|
||||
</CardHeader>
|
||||
<CardContent className="space-y-3">
|
||||
{conflictError && (
|
||||
<div className="rounded border border-yellow-600/40 bg-yellow-950/30 px-3 py-2 text-xs text-yellow-400">
|
||||
UI config disabled — set <code>CONFIG_SOURCE=db</code> on the server
|
||||
</div>
|
||||
)}
|
||||
{fields.map((field) => (
|
||||
<div key={field.key} className="space-y-1">
|
||||
<Label className="text-xs text-zinc-400">
|
||||
{field.label}
|
||||
{field.required && <span className="ml-1 text-red-400">*</span>}
|
||||
</Label>
|
||||
<Input
|
||||
type={field.type ?? "text"}
|
||||
value={formValues[field.key] ?? ""}
|
||||
onChange={(e) =>
|
||||
setFormValues((prev) => ({ ...prev, [field.key]: e.target.value }))
|
||||
}
|
||||
placeholder={
|
||||
field.type === "password" ? "••••••••" : `Enter ${field.label.toLowerCase()}`
|
||||
}
|
||||
className="h-8 border-zinc-700 bg-zinc-800 text-xs text-zinc-100"
|
||||
/>
|
||||
</div>
|
||||
))}
|
||||
{testResult && (
|
||||
<div
|
||||
className={`flex items-center gap-2 rounded px-3 py-2 text-xs ${
|
||||
testResult.ok
|
||||
? "bg-green-950/30 text-green-400"
|
||||
: "bg-red-950/30 text-red-400"
|
||||
}`}
|
||||
>
|
||||
{testResult.ok ? (
|
||||
<CheckCircle className="h-3.5 w-3.5" />
|
||||
) : (
|
||||
<XCircle className="h-3.5 w-3.5" />
|
||||
)}
|
||||
{testResult.message}
|
||||
</div>
|
||||
)}
|
||||
<div className="flex gap-2 pt-1">
|
||||
<Button
|
||||
variant="outline"
|
||||
size="sm"
|
||||
onClick={handleTest}
|
||||
disabled={testProvider.isPending}
|
||||
className="border-zinc-700 text-xs"
|
||||
>
|
||||
{testProvider.isPending && <Loader2 className="mr-1 h-3 w-3 animate-spin" />}
|
||||
Test Connection
|
||||
</Button>
|
||||
<Button
|
||||
size="sm"
|
||||
onClick={handleSave}
|
||||
disabled={updateProvider.isPending}
|
||||
className="text-xs"
|
||||
>
|
||||
{updateProvider.isPending && <Loader2 className="mr-1 h-3 w-3 animate-spin" />}
|
||||
Save
|
||||
</Button>
|
||||
</div>
|
||||
</CardContent>
|
||||
</Card>
|
||||
);
|
||||
}
|
||||
|
||||
export function ProviderSettingsPanel() {
|
||||
const { data: config } = useConfig();
|
||||
const { data: providerConfigs = [] } = useProviderConfigs();
|
||||
|
||||
const availableTypes = config?.available_provider_types ?? [];
|
||||
|
||||
return (
|
||||
<div className="space-y-4 p-6">
|
||||
<div>
|
||||
<h2 className="text-sm font-semibold text-zinc-100">Provider Configuration</h2>
|
||||
<p className="mt-0.5 text-xs text-zinc-500">
|
||||
Configure media providers. Requires <code>CONFIG_SOURCE=db</code> on the server.
|
||||
</p>
|
||||
</div>
|
||||
{availableTypes.length === 0 ? (
|
||||
<p className="text-xs text-zinc-500">No providers available in this build.</p>
|
||||
) : (
|
||||
<div className="space-y-4">
|
||||
{availableTypes.map((type) => {
|
||||
const existing = providerConfigs.find((c) => c.provider_type === type);
|
||||
return (
|
||||
<ProviderCard
|
||||
key={type}
|
||||
providerType={type}
|
||||
existingConfig={existing}
|
||||
/>
|
||||
);
|
||||
})}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
27
k-tv-frontend/app/(main)/admin/layout.tsx
Normal file
27
k-tv-frontend/app/(main)/admin/layout.tsx
Normal file
@@ -0,0 +1,27 @@
|
||||
"use client";
|
||||
|
||||
import { useEffect, type ReactNode } from "react";
|
||||
import { useRouter } from "next/navigation";
|
||||
import { useCurrentUser } from "@/hooks/use-auth";
|
||||
import { useAuthContext } from "@/context/auth-context";
|
||||
|
||||
export default function AdminLayout({ children }: { children: ReactNode }) {
|
||||
const { token, isLoaded } = useAuthContext();
|
||||
const router = useRouter();
|
||||
const { data: user, isLoading } = useCurrentUser();
|
||||
|
||||
useEffect(() => {
|
||||
if (!isLoaded) return;
|
||||
if (!token) {
|
||||
router.replace("/login");
|
||||
return;
|
||||
}
|
||||
if (!isLoading && user && !user.is_admin) {
|
||||
router.replace("/dashboard");
|
||||
}
|
||||
}, [isLoaded, token, user, isLoading, router]);
|
||||
|
||||
if (!isLoaded || isLoading || !user?.is_admin) return null;
|
||||
|
||||
return <>{children}</>;
|
||||
}
|
||||
@@ -1,21 +1,15 @@
|
||||
"use client";
|
||||
|
||||
import { useState, useEffect } from "react";
|
||||
import { useRouter } from "next/navigation";
|
||||
import { useAuthContext } from "@/context/auth-context";
|
||||
import { useActivityLog, useServerLogs } from "@/hooks/use-admin";
|
||||
import { ServerLogsPanel } from "./components/server-logs-panel";
|
||||
import { ActivityLogPanel } from "./components/activity-log-panel";
|
||||
import { ProviderSettingsPanel } from "./components/provider-settings-panel";
|
||||
import { Tabs, TabsList, TabsTrigger, TabsContent } from "@/components/ui/tabs";
|
||||
|
||||
export default function AdminPage() {
|
||||
const { token, isLoaded } = useAuthContext();
|
||||
const router = useRouter();
|
||||
|
||||
useEffect(() => {
|
||||
if (isLoaded && !token) {
|
||||
router.replace("/login");
|
||||
}
|
||||
}, [isLoaded, token, router]);
|
||||
const { token } = useAuthContext();
|
||||
|
||||
const { lines, connected } = useServerLogs(token);
|
||||
const [localLines, setLocalLines] = useState(lines);
|
||||
@@ -27,8 +21,6 @@ export default function AdminPage() {
|
||||
|
||||
const { data: events = [], isLoading } = useActivityLog(token);
|
||||
|
||||
if (!isLoaded || !token) return null;
|
||||
|
||||
return (
|
||||
<div className="flex flex-1 flex-col overflow-hidden">
|
||||
{/* Page header */}
|
||||
@@ -37,6 +29,25 @@ export default function AdminPage() {
|
||||
<span className="text-xs text-zinc-500">System monitoring & logs</span>
|
||||
</div>
|
||||
|
||||
<Tabs defaultValue="logs" className="flex flex-1 flex-col overflow-hidden">
|
||||
<div className="border-b border-zinc-800 px-6">
|
||||
<TabsList className="h-9 bg-transparent p-0 gap-1">
|
||||
<TabsTrigger
|
||||
value="logs"
|
||||
className="rounded-none border-b-2 border-transparent px-3 py-1.5 text-xs data-[state=active]:border-zinc-100 data-[state=active]:bg-transparent data-[state=active]:text-zinc-100"
|
||||
>
|
||||
Logs
|
||||
</TabsTrigger>
|
||||
<TabsTrigger
|
||||
value="providers"
|
||||
className="rounded-none border-b-2 border-transparent px-3 py-1.5 text-xs data-[state=active]:border-zinc-100 data-[state=active]:bg-transparent data-[state=active]:text-zinc-100"
|
||||
>
|
||||
Providers
|
||||
</TabsTrigger>
|
||||
</TabsList>
|
||||
</div>
|
||||
|
||||
<TabsContent value="logs" className="flex min-h-0 flex-1 overflow-hidden mt-0">
|
||||
{/* Two-column layout */}
|
||||
<div className="flex min-h-0 flex-1 overflow-hidden">
|
||||
{/* Left: server logs */}
|
||||
@@ -53,6 +64,12 @@ export default function AdminPage() {
|
||||
<ActivityLogPanel events={events} isLoading={isLoading} />
|
||||
</div>
|
||||
</div>
|
||||
</TabsContent>
|
||||
|
||||
<TabsContent value="providers" className="flex-1 overflow-auto mt-0">
|
||||
<ProviderSettingsPanel />
|
||||
</TabsContent>
|
||||
</Tabs>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
17
k-tv-frontend/app/(main)/components/admin-nav-link.tsx
Normal file
17
k-tv-frontend/app/(main)/components/admin-nav-link.tsx
Normal file
@@ -0,0 +1,17 @@
|
||||
"use client";
|
||||
|
||||
import Link from "next/link";
|
||||
import { useCurrentUser } from "@/hooks/use-auth";
|
||||
|
||||
export function AdminNavLink() {
|
||||
const { data: user } = useCurrentUser();
|
||||
if (!user?.is_admin) return null;
|
||||
return (
|
||||
<Link
|
||||
href="/admin"
|
||||
className="rounded-md px-3 py-1.5 text-sm text-zinc-400 transition-colors hover:bg-zinc-800 hover:text-zinc-100"
|
||||
>
|
||||
Admin
|
||||
</Link>
|
||||
);
|
||||
}
|
||||
@@ -1,12 +1,12 @@
|
||||
import Link from "next/link";
|
||||
import { type ReactNode } from "react";
|
||||
import { NavAuth } from "./components/nav-auth";
|
||||
import { AdminNavLink } from "./components/admin-nav-link";
|
||||
|
||||
const NAV_LINKS = [
|
||||
{ href: "/tv", label: "TV" },
|
||||
{ href: "/guide", label: "Guide" },
|
||||
{ href: "/dashboard", label: "Dashboard" },
|
||||
{ href: "/admin", label: "Admin" },
|
||||
{ href: "/docs", label: "Docs" },
|
||||
];
|
||||
|
||||
@@ -33,6 +33,9 @@ export default function MainLayout({ children }: { children: ReactNode }) {
|
||||
</Link>
|
||||
</li>
|
||||
))}
|
||||
<li>
|
||||
<AdminNavLink />
|
||||
</li>
|
||||
</ul>
|
||||
<div className="ml-2 border-l border-zinc-800 pl-2">
|
||||
<NavAuth />
|
||||
|
||||
@@ -884,7 +884,7 @@ function TvPageContent() {
|
||||
</div>
|
||||
|
||||
{/* Bottom: banner + info row */}
|
||||
<div className="flex flex-col gap-3 bg-gradient-to-t from-black/80 via-black/40 to-transparent p-5 pt-20">
|
||||
<div className="flex flex-col gap-3 bg-linear-to-t from-black/80 via-black/40 to-transparent p-5 pt-20">
|
||||
{showBanner && nextSlot && (
|
||||
<UpNextBanner
|
||||
nextShowTitle={nextSlot.item.title}
|
||||
|
||||
52
k-tv-frontend/hooks/use-admin-providers.ts
Normal file
52
k-tv-frontend/hooks/use-admin-providers.ts
Normal file
@@ -0,0 +1,52 @@
|
||||
"use client";
|
||||
|
||||
import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query";
|
||||
import { api } from "@/lib/api";
|
||||
import { useAuthContext } from "@/context/auth-context";
|
||||
|
||||
export function useProviderConfigs() {
|
||||
const { token } = useAuthContext();
|
||||
return useQuery({
|
||||
queryKey: ["admin", "providers"],
|
||||
queryFn: () => api.admin.providers.getProviders(token!),
|
||||
enabled: !!token,
|
||||
});
|
||||
}
|
||||
|
||||
export function useUpdateProvider() {
|
||||
const { token } = useAuthContext();
|
||||
const qc = useQueryClient();
|
||||
return useMutation({
|
||||
mutationFn: ({
|
||||
type,
|
||||
payload,
|
||||
}: {
|
||||
type: string;
|
||||
payload: { config_json: Record<string, string>; enabled: boolean };
|
||||
}) => api.admin.providers.updateProvider(token!, type, payload),
|
||||
onSuccess: () => qc.invalidateQueries({ queryKey: ["admin", "providers"] }),
|
||||
});
|
||||
}
|
||||
|
||||
export function useDeleteProvider() {
|
||||
const { token } = useAuthContext();
|
||||
const qc = useQueryClient();
|
||||
return useMutation({
|
||||
mutationFn: (type: string) =>
|
||||
api.admin.providers.deleteProvider(token!, type),
|
||||
onSuccess: () => qc.invalidateQueries({ queryKey: ["admin", "providers"] }),
|
||||
});
|
||||
}
|
||||
|
||||
export function useTestProvider() {
|
||||
const { token } = useAuthContext();
|
||||
return useMutation({
|
||||
mutationFn: ({
|
||||
type,
|
||||
payload,
|
||||
}: {
|
||||
type: string;
|
||||
payload: { config_json: Record<string, string>; enabled: boolean };
|
||||
}) => api.admin.providers.testProvider(token!, type, payload),
|
||||
});
|
||||
}
|
||||
@@ -15,6 +15,8 @@ import type {
|
||||
TranscodeSettings,
|
||||
TranscodeStats,
|
||||
ActivityEvent,
|
||||
ProviderConfig,
|
||||
ProviderTestResult,
|
||||
} from "@/lib/types";
|
||||
|
||||
const API_BASE =
|
||||
@@ -179,6 +181,36 @@ export const api = {
|
||||
admin: {
|
||||
activity: (token: string) =>
|
||||
request<ActivityEvent[]>("/admin/activity", { token }),
|
||||
|
||||
providers: {
|
||||
getProviders: (token: string) =>
|
||||
request<ProviderConfig[]>("/admin/providers", { token }),
|
||||
|
||||
updateProvider: (
|
||||
token: string,
|
||||
type: string,
|
||||
payload: { config_json: Record<string, string>; enabled: boolean },
|
||||
) =>
|
||||
request<ProviderConfig>(`/admin/providers/${type}`, {
|
||||
method: "PUT",
|
||||
body: JSON.stringify(payload),
|
||||
token,
|
||||
}),
|
||||
|
||||
deleteProvider: (token: string, type: string) =>
|
||||
request<void>(`/admin/providers/${type}`, { method: "DELETE", token }),
|
||||
|
||||
testProvider: (
|
||||
token: string,
|
||||
type: string,
|
||||
payload: { config_json: Record<string, string>; enabled: boolean },
|
||||
) =>
|
||||
request<ProviderTestResult>(`/admin/providers/${type}/test`, {
|
||||
method: "POST",
|
||||
body: JSON.stringify(payload),
|
||||
token,
|
||||
}),
|
||||
},
|
||||
},
|
||||
|
||||
schedule: {
|
||||
|
||||
@@ -131,6 +131,18 @@ export interface ConfigResponse {
|
||||
providers: ProviderInfo[];
|
||||
/** Primary provider capabilities — kept for backward compat. */
|
||||
provider_capabilities: ProviderCapabilities;
|
||||
available_provider_types: string[];
|
||||
}
|
||||
|
||||
export interface ProviderConfig {
|
||||
provider_type: string;
|
||||
config_json: Record<string, string>;
|
||||
enabled: boolean;
|
||||
}
|
||||
|
||||
export interface ProviderTestResult {
|
||||
ok: boolean;
|
||||
message: string;
|
||||
}
|
||||
|
||||
// Auth
|
||||
@@ -145,6 +157,7 @@ export interface UserResponse {
|
||||
id: string;
|
||||
email: string;
|
||||
created_at: string;
|
||||
is_admin: boolean;
|
||||
}
|
||||
|
||||
// Channels
|
||||
|
||||
Reference in New Issue
Block a user