fix: local_files hot-reload via RwLock state fields + rebuild_registry local_files case
This commit is contained in:
@@ -80,6 +80,14 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let db_pool = k_core::db::connect(&db_config).await?;
|
let db_pool = k_core::db::connect(&db_config).await?;
|
||||||
run_migrations(&db_pool).await?;
|
run_migrations(&db_pool).await?;
|
||||||
|
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
let raw_sqlite_pool: Option<sqlx::SqlitePool> = match &db_pool {
|
||||||
|
#[cfg(feature = "sqlite")]
|
||||||
|
k_core::db::DatabasePool::Sqlite(p) => Some(p.clone()),
|
||||||
|
#[allow(unreachable_patterns)]
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
let user_repo = build_user_repository(&db_pool).await?;
|
let user_repo = build_user_repository(&db_pool).await?;
|
||||||
let channel_repo = build_channel_repository(&db_pool).await?;
|
let channel_repo = build_channel_repository(&db_pool).await?;
|
||||||
let schedule_repo = build_schedule_repository(&db_pool).await?;
|
let schedule_repo = build_schedule_repository(&db_pool).await?;
|
||||||
@@ -220,10 +228,19 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
#[cfg(feature = "local-files")]
|
#[cfg(feature = "local-files")]
|
||||||
{
|
{ state.raw_sqlite_pool = raw_sqlite_pool; }
|
||||||
state.local_index = local_index;
|
|
||||||
state.transcode_manager = transcode_manager;
|
#[cfg(feature = "local-files")]
|
||||||
state.sqlite_pool = sqlite_pool_for_state;
|
if let Some(idx) = local_index {
|
||||||
|
*state.local_index.write().await = Some(idx);
|
||||||
|
}
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
if let Some(tm) = transcode_manager {
|
||||||
|
*state.transcode_manager.write().await = Some(tm);
|
||||||
|
}
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
if let Some(pool) = sqlite_pool_for_state {
|
||||||
|
*state.sqlite_pool.write().await = Some(pool);
|
||||||
}
|
}
|
||||||
|
|
||||||
let server_config = ServerConfig {
|
let server_config = ServerConfig {
|
||||||
|
|||||||
@@ -113,6 +113,65 @@ async fn rebuild_registry(state: &AppState) -> DomainResult<()> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
"local_files" => {
|
||||||
|
let config: std::collections::HashMap<String, String> =
|
||||||
|
match serde_json::from_str(&row.config_json) {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let files_dir = match config.get("files_dir") {
|
||||||
|
Some(d) => std::path::PathBuf::from(d),
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let transcode_dir = config
|
||||||
|
.get("transcode_dir")
|
||||||
|
.filter(|s| !s.is_empty())
|
||||||
|
.map(std::path::PathBuf::from);
|
||||||
|
|
||||||
|
let cleanup_ttl_hours: u32 = config
|
||||||
|
.get("cleanup_ttl_hours")
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.unwrap_or(24);
|
||||||
|
|
||||||
|
let base_url = state.config.base_url.clone();
|
||||||
|
|
||||||
|
let sqlite_pool = match &state.raw_sqlite_pool {
|
||||||
|
Some(p) => p.clone(),
|
||||||
|
None => {
|
||||||
|
tracing::warn!("local_files provider requires SQLite; skipping");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let lf_cfg = infra::LocalFilesConfig {
|
||||||
|
root_dir: files_dir,
|
||||||
|
base_url,
|
||||||
|
transcode_dir: transcode_dir.clone(),
|
||||||
|
cleanup_ttl_hours,
|
||||||
|
};
|
||||||
|
|
||||||
|
let idx = Arc::new(infra::LocalIndex::new(&lf_cfg, sqlite_pool.clone()).await);
|
||||||
|
|
||||||
|
let scan_idx = Arc::clone(&idx);
|
||||||
|
tokio::spawn(async move { scan_idx.rescan().await; });
|
||||||
|
|
||||||
|
let tm = transcode_dir.as_ref().map(|td| {
|
||||||
|
std::fs::create_dir_all(td).ok();
|
||||||
|
infra::TranscodeManager::new(td.clone(), cleanup_ttl_hours)
|
||||||
|
});
|
||||||
|
|
||||||
|
new_registry.register(
|
||||||
|
"local",
|
||||||
|
Arc::new(infra::LocalFilesProvider::new(Arc::clone(&idx), lf_cfg, tm.clone())),
|
||||||
|
);
|
||||||
|
|
||||||
|
*state.local_index.write().await = Some(idx);
|
||||||
|
*state.transcode_manager.write().await = tm;
|
||||||
|
*state.sqlite_pool.write().await = Some(sqlite_pool);
|
||||||
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -147,9 +147,7 @@ async fn trigger_rescan(
|
|||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
CurrentUser(_user): CurrentUser,
|
CurrentUser(_user): CurrentUser,
|
||||||
) -> Result<Json<serde_json::Value>, ApiError> {
|
) -> Result<Json<serde_json::Value>, ApiError> {
|
||||||
let index = state
|
let index = state.local_index.read().await.clone()
|
||||||
.local_index
|
|
||||||
.as_ref()
|
|
||||||
.ok_or_else(|| ApiError::not_implemented("no local files provider active"))?;
|
.ok_or_else(|| ApiError::not_implemented("no local files provider active"))?;
|
||||||
let count = index.rescan().await;
|
let count = index.rescan().await;
|
||||||
Ok(Json(serde_json::json!({ "items_found": count })))
|
Ok(Json(serde_json::json!({ "items_found": count })))
|
||||||
@@ -164,9 +162,7 @@ async fn transcode_playlist(
|
|||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
Path(id): Path<String>,
|
Path(id): Path<String>,
|
||||||
) -> Result<Response, ApiError> {
|
) -> Result<Response, ApiError> {
|
||||||
let tm = state
|
let tm = state.transcode_manager.read().await.clone()
|
||||||
.transcode_manager
|
|
||||||
.as_ref()
|
|
||||||
.ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
|
.ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
|
||||||
|
|
||||||
let root = state.config.local_files_dir.as_ref().ok_or_else(|| {
|
let root = state.config.local_files_dir.as_ref().ok_or_else(|| {
|
||||||
@@ -219,9 +215,7 @@ async fn transcode_segment(
|
|||||||
return Err(ApiError::Forbidden("invalid segment path".into()));
|
return Err(ApiError::Forbidden("invalid segment path".into()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let tm = state
|
let tm = state.transcode_manager.read().await.clone()
|
||||||
.transcode_manager
|
|
||||||
.as_ref()
|
|
||||||
.ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
|
.ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
|
||||||
|
|
||||||
let file_path = tm.transcode_dir.join(&id).join(&segment);
|
let file_path = tm.transcode_dir.join(&id).join(&segment);
|
||||||
@@ -262,14 +256,12 @@ async fn get_transcode_settings(
|
|||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
CurrentUser(_user): CurrentUser,
|
CurrentUser(_user): CurrentUser,
|
||||||
) -> Result<Json<TranscodeSettingsResponse>, ApiError> {
|
) -> Result<Json<TranscodeSettingsResponse>, ApiError> {
|
||||||
let pool = state
|
let pool = state.sqlite_pool.read().await.clone()
|
||||||
.sqlite_pool
|
|
||||||
.as_ref()
|
|
||||||
.ok_or_else(|| ApiError::not_implemented("sqlite not available"))?;
|
.ok_or_else(|| ApiError::not_implemented("sqlite not available"))?;
|
||||||
|
|
||||||
let (ttl,): (i64,) =
|
let (ttl,): (i64,) =
|
||||||
sqlx::query_as("SELECT cleanup_ttl_hours FROM transcode_settings WHERE id = 1")
|
sqlx::query_as("SELECT cleanup_ttl_hours FROM transcode_settings WHERE id = 1")
|
||||||
.fetch_one(pool)
|
.fetch_one(&pool)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| ApiError::internal(e.to_string()))?;
|
.map_err(|e| ApiError::internal(e.to_string()))?;
|
||||||
|
|
||||||
@@ -284,19 +276,18 @@ async fn update_transcode_settings(
|
|||||||
CurrentUser(_user): CurrentUser,
|
CurrentUser(_user): CurrentUser,
|
||||||
Json(req): Json<UpdateTranscodeSettingsRequest>,
|
Json(req): Json<UpdateTranscodeSettingsRequest>,
|
||||||
) -> Result<Json<TranscodeSettingsResponse>, ApiError> {
|
) -> Result<Json<TranscodeSettingsResponse>, ApiError> {
|
||||||
let pool = state
|
let pool = state.sqlite_pool.read().await.clone()
|
||||||
.sqlite_pool
|
|
||||||
.as_ref()
|
|
||||||
.ok_or_else(|| ApiError::not_implemented("sqlite not available"))?;
|
.ok_or_else(|| ApiError::not_implemented("sqlite not available"))?;
|
||||||
|
|
||||||
let ttl = req.cleanup_ttl_hours as i64;
|
let ttl = req.cleanup_ttl_hours as i64;
|
||||||
sqlx::query("UPDATE transcode_settings SET cleanup_ttl_hours = ? WHERE id = 1")
|
sqlx::query("UPDATE transcode_settings SET cleanup_ttl_hours = ? WHERE id = 1")
|
||||||
.bind(ttl)
|
.bind(ttl)
|
||||||
.execute(pool)
|
.execute(&pool)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| ApiError::internal(e.to_string()))?;
|
.map_err(|e| ApiError::internal(e.to_string()))?;
|
||||||
|
|
||||||
if let Some(tm) = &state.transcode_manager {
|
let tm_opt = state.transcode_manager.read().await.clone();
|
||||||
|
if let Some(tm) = tm_opt {
|
||||||
tm.set_cleanup_ttl(req.cleanup_ttl_hours);
|
tm.set_cleanup_ttl(req.cleanup_ttl_hours);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -310,9 +301,7 @@ async fn get_transcode_stats(
|
|||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
CurrentUser(_user): CurrentUser,
|
CurrentUser(_user): CurrentUser,
|
||||||
) -> Result<Json<TranscodeStatsResponse>, ApiError> {
|
) -> Result<Json<TranscodeStatsResponse>, ApiError> {
|
||||||
let tm = state
|
let tm = state.transcode_manager.read().await.clone()
|
||||||
.transcode_manager
|
|
||||||
.as_ref()
|
|
||||||
.ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
|
.ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
|
||||||
let (cache_size_bytes, item_count) = tm.cache_stats().await;
|
let (cache_size_bytes, item_count) = tm.cache_stats().await;
|
||||||
Ok(Json(TranscodeStatsResponse {
|
Ok(Json(TranscodeStatsResponse {
|
||||||
@@ -326,9 +315,7 @@ async fn clear_transcode_cache(
|
|||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
CurrentUser(_user): CurrentUser,
|
CurrentUser(_user): CurrentUser,
|
||||||
) -> Result<StatusCode, ApiError> {
|
) -> Result<StatusCode, ApiError> {
|
||||||
let tm = state
|
let tm = state.transcode_manager.read().await.clone()
|
||||||
.transcode_manager
|
|
||||||
.as_ref()
|
|
||||||
.ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
|
.ok_or_else(|| ApiError::not_implemented("TRANSCODE_DIR not configured"))?;
|
||||||
tm.clear_cache()
|
tm.clear_cache()
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -39,13 +39,16 @@ pub struct AppState {
|
|||||||
pub activity_log_repo: Arc<dyn ActivityLogRepository>,
|
pub activity_log_repo: Arc<dyn ActivityLogRepository>,
|
||||||
/// Index for the local-files provider, used by the rescan route.
|
/// Index for the local-files provider, used by the rescan route.
|
||||||
#[cfg(feature = "local-files")]
|
#[cfg(feature = "local-files")]
|
||||||
pub local_index: Option<Arc<infra::LocalIndex>>,
|
pub local_index: Arc<tokio::sync::RwLock<Option<Arc<infra::LocalIndex>>>>,
|
||||||
/// TranscodeManager for FFmpeg HLS transcoding (requires TRANSCODE_DIR).
|
/// TranscodeManager for FFmpeg HLS transcoding (requires TRANSCODE_DIR).
|
||||||
#[cfg(feature = "local-files")]
|
#[cfg(feature = "local-files")]
|
||||||
pub transcode_manager: Option<Arc<infra::TranscodeManager>>,
|
pub transcode_manager: Arc<tokio::sync::RwLock<Option<Arc<infra::TranscodeManager>>>>,
|
||||||
/// SQLite pool for transcode settings CRUD.
|
/// SQLite pool for transcode settings CRUD.
|
||||||
#[cfg(feature = "local-files")]
|
#[cfg(feature = "local-files")]
|
||||||
pub sqlite_pool: Option<sqlx::SqlitePool>,
|
pub sqlite_pool: Arc<tokio::sync::RwLock<Option<sqlx::SqlitePool>>>,
|
||||||
|
/// Raw sqlite pool — always present when running SQLite, used for local-files hot-reload.
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
pub raw_sqlite_pool: Option<sqlx::SqlitePool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AppState {
|
impl AppState {
|
||||||
@@ -137,11 +140,13 @@ impl AppState {
|
|||||||
log_history,
|
log_history,
|
||||||
activity_log_repo,
|
activity_log_repo,
|
||||||
#[cfg(feature = "local-files")]
|
#[cfg(feature = "local-files")]
|
||||||
local_index: None,
|
local_index: Arc::new(tokio::sync::RwLock::new(None)),
|
||||||
#[cfg(feature = "local-files")]
|
#[cfg(feature = "local-files")]
|
||||||
transcode_manager: None,
|
transcode_manager: Arc::new(tokio::sync::RwLock::new(None)),
|
||||||
#[cfg(feature = "local-files")]
|
#[cfg(feature = "local-files")]
|
||||||
sqlite_pool: None,
|
sqlite_pool: Arc::new(tokio::sync::RwLock::new(None)),
|
||||||
|
#[cfg(feature = "local-files")]
|
||||||
|
raw_sqlite_pool: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user