From 9d792249c903abed71f955abcf9c26f2ca1b0b11 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Mon, 16 Mar 2026 04:24:39 +0100 Subject: [PATCH] feat: implement transcode settings repository and integrate with local-files provider --- k-tv-backend/Cargo.lock | 1 - k-tv-backend/api/Cargo.toml | 3 +- k-tv-backend/api/src/main.rs | 178 ++++++++---------- k-tv-backend/api/src/poller.rs | 74 +++++--- .../api/src/routes/admin_providers.rs | 48 ++--- k-tv-backend/api/src/routes/files.rs | 14 +- k-tv-backend/api/src/state.rs | 20 +- k-tv-backend/domain/src/repositories.rs | 9 + k-tv-backend/infra/src/factory.rs | 56 +++++- k-tv-backend/infra/src/lib.rs | 5 + .../src/transcode_settings_repository/mod.rs | 4 + .../transcode_settings_repository/sqlite.rs | 34 ++++ 12 files changed, 269 insertions(+), 177 deletions(-) create mode 100644 k-tv-backend/infra/src/transcode_settings_repository/mod.rs create mode 100644 k-tv-backend/infra/src/transcode_settings_repository/sqlite.rs diff --git a/k-tv-backend/Cargo.lock b/k-tv-backend/Cargo.lock index 1957419..8ab9857 100644 --- a/k-tv-backend/Cargo.lock +++ b/k-tv-backend/Cargo.lock @@ -86,7 +86,6 @@ dependencies = [ "serde", "serde_json", "serde_qs", - "sqlx", "thiserror 2.0.17", "time", "tokio", diff --git a/k-tv-backend/api/Cargo.toml b/k-tv-backend/api/Cargo.toml index ce26728..226ce3c 100644 --- a/k-tv-backend/api/Cargo.toml +++ b/k-tv-backend/api/Cargo.toml @@ -11,7 +11,7 @@ postgres = ["infra/postgres"] auth-oidc = ["infra/auth-oidc"] auth-jwt = ["infra/auth-jwt"] jellyfin = ["infra/jellyfin"] -local-files = ["infra/local-files", "dep:tokio-util", "dep:sqlx"] +local-files = ["infra/local-files", "dep:tokio-util"] [profile.release] strip = true @@ -65,4 +65,3 @@ async-trait = "0.1" dotenvy = "0.15.7" time = "0.3" tokio-util = { version = "0.7", features = ["io"], optional = true } -sqlx = { version = "0.8.6", features = ["runtime-tokio", "sqlite"], optional = true } diff --git a/k-tv-backend/api/src/main.rs b/k-tv-backend/api/src/main.rs index 52d9840..bd5c89b 100644 --- a/k-tv-backend/api/src/main.rs +++ b/k-tv-backend/api/src/main.rs @@ -16,6 +16,8 @@ use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberI use domain::{ChannelService, IMediaProvider, IProviderRegistry, ProviderCapabilities, ScheduleEngineService, StreamingProtocol, UserService}; use infra::factory::{build_activity_log_repository, build_channel_repository, build_provider_config_repository, build_schedule_repository, build_user_repository}; +#[cfg(feature = "local-files")] +use infra::factory::build_transcode_settings_repository; use infra::run_migrations; use k_core::http::server::{ServerConfig, apply_standard_middleware}; use tokio::net::TcpListener; @@ -79,14 +81,7 @@ async fn main() -> anyhow::Result<()> { let db_pool = k_core::db::connect(&db_config).await?; run_migrations(&db_pool).await?; - - #[cfg(feature = "local-files")] - let raw_sqlite_pool: Option = match &db_pool { - #[cfg(feature = "sqlite")] - k_core::db::DatabasePool::Sqlite(p) => Some(p.clone()), - #[allow(unreachable_patterns)] - _ => None, - }; + let db_pool = Arc::new(db_pool); let user_repo = build_user_repository(&db_pool).await?; let channel_repo = build_channel_repository(&db_pool).await?; @@ -101,8 +96,6 @@ async fn main() -> anyhow::Result<()> { let mut local_index: Option> = None; #[cfg(feature = "local-files")] let mut transcode_manager: Option> = None; - #[cfg(feature = "local-files")] - let mut sqlite_pool_for_state: Option = None; let mut registry = infra::ProviderRegistry::new(); @@ -123,51 +116,43 @@ async fn main() -> anyhow::Result<()> { } #[cfg(feature = "local-files")] "local_files" => { - if let k_core::db::DatabasePool::Sqlite(ref sqlite_pool) = db_pool { - if let Ok(cfg_map) = serde_json::from_str::>(&row.config_json) { - if let Some(files_dir) = cfg_map.get("files_dir") { - let transcode_dir = cfg_map.get("transcode_dir") - .filter(|s| !s.is_empty()) - .map(std::path::PathBuf::from); - let cleanup_ttl_hours: u32 = cfg_map.get("cleanup_ttl_hours") - .and_then(|s| s.parse().ok()) - .unwrap_or(24); - let lf_cfg = infra::LocalFilesConfig { - root_dir: std::path::PathBuf::from(files_dir), - base_url: config.base_url.clone(), - transcode_dir: transcode_dir.clone(), - cleanup_ttl_hours, - }; - tracing::info!("Loading local-files provider from DB config at {:?}", files_dir); - let idx = Arc::new(infra::LocalIndex::new(&lf_cfg, sqlite_pool.clone()).await); - local_index = Some(Arc::clone(&idx)); - let scan_idx = Arc::clone(&idx); - tokio::spawn(async move { scan_idx.rescan().await; }); - let tm = transcode_dir.as_ref().map(|td| { - std::fs::create_dir_all(td).ok(); - tracing::info!("Transcoding enabled; cache dir: {:?}", td); - let tm = infra::TranscodeManager::new(td.clone(), cleanup_ttl_hours); - // Load persisted TTL override from transcode_settings table. - let tm_clone = Arc::clone(&tm); - let pool_clone = sqlite_pool.clone(); - tokio::spawn(async move { - if let Ok(row) = sqlx::query_as::<_, (i64,)>( - "SELECT cleanup_ttl_hours FROM transcode_settings WHERE id = 1", - ) - .fetch_one(&pool_clone) - .await - { - tm_clone.set_cleanup_ttl(row.0 as u32); - } - }); - tm - }); - registry.register( - "local", - Arc::new(infra::LocalFilesProvider::new(Arc::clone(&idx), lf_cfg, tm.clone())), - ); - transcode_manager = tm; - sqlite_pool_for_state = Some(sqlite_pool.clone()); + if let Ok(cfg_map) = serde_json::from_str::>(&row.config_json) { + if let Some(files_dir) = cfg_map.get("files_dir") { + let transcode_dir = cfg_map.get("transcode_dir") + .filter(|s| !s.is_empty()) + .map(std::path::PathBuf::from); + let cleanup_ttl_hours: u32 = cfg_map.get("cleanup_ttl_hours") + .and_then(|s| s.parse().ok()) + .unwrap_or(24); + tracing::info!("Loading local-files provider from DB config at {:?}", files_dir); + match infra::factory::build_local_files_bundle( + &db_pool, + std::path::PathBuf::from(files_dir), + transcode_dir, + cleanup_ttl_hours, + config.base_url.clone(), + ).await { + Ok(bundle) => { + let scan_idx = Arc::clone(&bundle.local_index); + tokio::spawn(async move { scan_idx.rescan().await; }); + if let Some(ref tm) = bundle.transcode_manager { + tracing::info!("Transcoding enabled"); + // Load persisted TTL override from transcode_settings table. + let tm_clone = Arc::clone(tm); + let repo = build_transcode_settings_repository(&db_pool).await.ok(); + tokio::spawn(async move { + if let Some(r) = repo { + if let Ok(Some(ttl)) = r.load_cleanup_ttl().await { + tm_clone.set_cleanup_ttl(ttl); + } + } + }); + } + registry.register("local", bundle.provider); + transcode_manager = bundle.transcode_manager; + local_index = Some(bundle.local_index); + } + Err(e) => tracing::warn!("Failed to build local-files provider: {}", e), } } } @@ -192,48 +177,34 @@ async fn main() -> anyhow::Result<()> { #[cfg(feature = "local-files")] if let Some(dir) = &config.local_files_dir { - if let k_core::db::DatabasePool::Sqlite(ref sqlite_pool) = db_pool { - tracing::info!("Media provider: local files at {:?}", dir); - let lf_cfg = infra::LocalFilesConfig { - root_dir: dir.clone(), - base_url: config.base_url.clone(), - transcode_dir: config.transcode_dir.clone(), - cleanup_ttl_hours: config.transcode_cleanup_ttl_hours, - }; - let idx = Arc::new(infra::LocalIndex::new(&lf_cfg, sqlite_pool.clone()).await); - local_index = Some(Arc::clone(&idx)); - let scan_idx = Arc::clone(&idx); - tokio::spawn(async move { scan_idx.rescan().await; }); - - // Build TranscodeManager if TRANSCODE_DIR is set. - let tm = config.transcode_dir.as_ref().map(|td| { - std::fs::create_dir_all(td).ok(); - tracing::info!("Transcoding enabled; cache dir: {:?}", td); - let tm = infra::TranscodeManager::new(td.clone(), config.transcode_cleanup_ttl_hours); - // Load persisted TTL from DB. - let tm_clone = Arc::clone(&tm); - let pool_clone = sqlite_pool.clone(); - tokio::spawn(async move { - if let Ok(row) = sqlx::query_as::<_, (i64,)>( - "SELECT cleanup_ttl_hours FROM transcode_settings WHERE id = 1", - ) - .fetch_one(&pool_clone) - .await - { - tm_clone.set_cleanup_ttl(row.0 as u32); - } - }); - tm - }); - - registry.register( - "local", - Arc::new(infra::LocalFilesProvider::new(idx, lf_cfg, tm.clone())), - ); - transcode_manager = tm; - sqlite_pool_for_state = Some(sqlite_pool.clone()); - } else { - tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR"); + tracing::info!("Media provider: local files at {:?}", dir); + match infra::factory::build_local_files_bundle( + &db_pool, + dir.clone(), + config.transcode_dir.clone(), + config.transcode_cleanup_ttl_hours, + config.base_url.clone(), + ).await { + Ok(bundle) => { + let scan_idx = Arc::clone(&bundle.local_index); + tokio::spawn(async move { scan_idx.rescan().await; }); + if let Some(ref tm) = bundle.transcode_manager { + tracing::info!("Transcoding enabled; cache dir: {:?}", config.transcode_dir); + let tm_clone = Arc::clone(tm); + let repo = build_transcode_settings_repository(&db_pool).await.ok(); + tokio::spawn(async move { + if let Some(r) = repo { + if let Ok(Some(ttl)) = r.load_cleanup_ttl().await { + tm_clone.set_cleanup_ttl(ttl); + } + } + }); + } + registry.register("local", bundle.provider); + transcode_manager = bundle.transcode_manager; + local_index = Some(bundle.local_index); + } + Err(e) => tracing::warn!("local-files requires SQLite; ignoring LOCAL_FILES_DIR: {}", e), } } } @@ -263,6 +234,9 @@ async fn main() -> anyhow::Result<()> { schedule_repo, ); + #[cfg(feature = "local-files")] + let transcode_settings_repo = build_transcode_settings_repository(&db_pool).await.ok(); + #[allow(unused_mut)] let mut state = AppState::new( user_service, @@ -275,12 +249,12 @@ async fn main() -> anyhow::Result<()> { log_tx, log_history, activity_log_repo, + db_pool, + #[cfg(feature = "local-files")] + transcode_settings_repo, ) .await?; - #[cfg(feature = "local-files")] - { state.raw_sqlite_pool = raw_sqlite_pool; } - #[cfg(feature = "local-files")] if let Some(idx) = local_index { *state.local_index.write().await = Some(idx); @@ -289,10 +263,6 @@ async fn main() -> anyhow::Result<()> { if let Some(tm) = transcode_manager { *state.transcode_manager.write().await = Some(tm); } - #[cfg(feature = "local-files")] - if let Some(pool) = sqlite_pool_for_state { - *state.sqlite_pool.write().await = Some(pool); - } let server_config = ServerConfig { cors_origins: config.cors_allowed_origins.clone(), diff --git a/k-tv-backend/api/src/poller.rs b/k-tv-backend/api/src/poller.rs index 20a5d10..300e33b 100644 --- a/k-tv-backend/api/src/poller.rs +++ b/k-tv-backend/api/src/poller.rs @@ -17,7 +17,7 @@ use domain::{ChannelRepository, DomainError, DomainEvent, ScheduleEngineService} /// Per-channel poller state. #[derive(Debug)] -struct ChannelPollState { +pub struct ChannelPollState { /// ID of the last slot we saw as current (None = no signal). last_slot_id: Option, /// Wall-clock instant of the last poll for this channel. @@ -80,13 +80,11 @@ pub(crate) async fn poll_tick( // Find the current slot let current_slot_id = match schedule_engine.get_active_schedule(channel.id, now).await { - Ok(Some(schedule)) => { - schedule - .slots - .iter() - .find(|s| s.start_at <= now && now < s.end_at) - .map(|s| s.id) - } + Ok(Some(schedule)) => schedule + .slots + .iter() + .find(|s| s.start_at <= now && now < s.end_at) + .map(|s| s.id), Ok(None) => None, Err(DomainError::NoActiveSchedule(_)) => None, Err(DomainError::ChannelNotFound(_)) => { @@ -109,7 +107,9 @@ pub(crate) async fn poll_tick( // State changed — emit appropriate event match ¤t_slot_id { Some(slot_id) => { - if let Ok(Some(schedule)) = schedule_engine.get_active_schedule(channel.id, now).await { + if let Ok(Some(schedule)) = + schedule_engine.get_active_schedule(channel.id, now).await + { if let Some(slot) = schedule.slots.iter().find(|s| s.id == *slot_id).cloned() { let _ = event_tx.send(DomainEvent::BroadcastTransition { channel_id: channel.id, @@ -137,12 +137,12 @@ mod tests { use async_trait::async_trait; use chrono::{DateTime, Duration, Utc}; + use domain::value_objects::{ChannelId, ContentType, UserId}; use domain::{ Channel, ChannelRepository, Collection, DomainResult, GeneratedSchedule, IProviderRegistry, MediaFilter, MediaItem, MediaItemId, PlaybackRecord, ProviderCapabilities, - ScheduleEngineService, ScheduleRepository, SeriesSummary, StreamQuality, StreamingProtocol, + ScheduleEngineService, ScheduleRepository, SeriesSummary, StreamQuality, }; - use domain::value_objects::{ChannelId, ContentType, UserId}; use tokio::sync::broadcast; use uuid::Uuid; @@ -188,14 +188,20 @@ mod tests { ) -> DomainResult> { Ok(self.active.clone()) } - async fn find_latest(&self, _channel_id: ChannelId) -> DomainResult> { + async fn find_latest( + &self, + _channel_id: ChannelId, + ) -> DomainResult> { Ok(self.active.clone()) } async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> { self.saved.lock().unwrap().push(schedule.clone()); Ok(()) } - async fn find_playback_history(&self, _channel_id: ChannelId) -> DomainResult> { + async fn find_playback_history( + &self, + _channel_id: ChannelId, + ) -> DomainResult> { Ok(vec![]) } async fn save_playback_record(&self, _record: &PlaybackRecord) -> DomainResult<()> { @@ -207,13 +213,21 @@ mod tests { #[async_trait] impl IProviderRegistry for MockRegistry { - async fn fetch_items(&self, _provider_id: &str, _filter: &MediaFilter) -> DomainResult> { + async fn fetch_items( + &self, + _provider_id: &str, + _filter: &MediaFilter, + ) -> DomainResult> { Ok(vec![]) } async fn fetch_by_id(&self, _item_id: &MediaItemId) -> DomainResult> { Ok(None) } - async fn get_stream_url(&self, _item_id: &MediaItemId, _quality: &StreamQuality) -> DomainResult { + async fn get_stream_url( + &self, + _item_id: &MediaItemId, + _quality: &StreamQuality, + ) -> DomainResult { unimplemented!() } fn provider_ids(&self) -> Vec { @@ -228,10 +242,18 @@ mod tests { async fn list_collections(&self, _provider_id: &str) -> DomainResult> { unimplemented!() } - async fn list_series(&self, _provider_id: &str, _collection_id: Option<&str>) -> DomainResult> { + async fn list_series( + &self, + _provider_id: &str, + _collection_id: Option<&str>, + ) -> DomainResult> { unimplemented!() } - async fn list_genres(&self, _provider_id: &str, _content_type: Option<&ContentType>) -> DomainResult> { + async fn list_genres( + &self, + _provider_id: &str, + _content_type: Option<&ContentType>, + ) -> DomainResult> { unimplemented!() } } @@ -318,7 +340,10 @@ mod tests { let event = event_rx.try_recv().expect("expected an event"); match event { - DomainEvent::BroadcastTransition { channel_id: cid, slot: s } => { + DomainEvent::BroadcastTransition { + channel_id: cid, + slot: s, + } => { assert_eq!(cid, channel_id); assert_eq!(s.id, slot_id); } @@ -388,11 +413,18 @@ mod tests { async fn find_latest(&self, _: ChannelId) -> DomainResult> { Ok(None) } - async fn save(&self, _: &GeneratedSchedule) -> DomainResult<()> { Ok(()) } - async fn find_playback_history(&self, _: ChannelId) -> DomainResult> { + async fn save(&self, _: &GeneratedSchedule) -> DomainResult<()> { + Ok(()) + } + async fn find_playback_history( + &self, + _: ChannelId, + ) -> DomainResult> { Ok(vec![]) } - async fn save_playback_record(&self, _: &PlaybackRecord) -> DomainResult<()> { Ok(()) } + async fn save_playback_record(&self, _: &PlaybackRecord) -> DomainResult<()> { + Ok(()) + } } let now = Utc::now(); diff --git a/k-tv-backend/api/src/routes/admin_providers.rs b/k-tv-backend/api/src/routes/admin_providers.rs index 16809c3..7eadd7c 100644 --- a/k-tv-backend/api/src/routes/admin_providers.rs +++ b/k-tv-backend/api/src/routes/admin_providers.rs @@ -138,39 +138,25 @@ async fn rebuild_registry(state: &AppState) -> DomainResult<()> { let base_url = state.config.base_url.clone(); - let sqlite_pool = match &state.raw_sqlite_pool { - Some(p) => p.clone(), - None => { - tracing::warn!("local_files provider requires SQLite; skipping"); + match infra::factory::build_local_files_bundle( + &state.db_pool, + files_dir, + transcode_dir, + cleanup_ttl_hours, + base_url, + ).await { + Ok(bundle) => { + let scan_idx = Arc::clone(&bundle.local_index); + tokio::spawn(async move { scan_idx.rescan().await; }); + new_registry.register("local", bundle.provider); + *state.local_index.write().await = Some(bundle.local_index); + *state.transcode_manager.write().await = bundle.transcode_manager; + } + Err(e) => { + tracing::warn!("local_files provider requires SQLite; skipping: {}", e); continue; } - }; - - let lf_cfg = infra::LocalFilesConfig { - root_dir: files_dir, - base_url, - transcode_dir: transcode_dir.clone(), - cleanup_ttl_hours, - }; - - let idx = Arc::new(infra::LocalIndex::new(&lf_cfg, sqlite_pool.clone()).await); - - let scan_idx = Arc::clone(&idx); - tokio::spawn(async move { scan_idx.rescan().await; }); - - let tm = transcode_dir.as_ref().map(|td| { - std::fs::create_dir_all(td).ok(); - infra::TranscodeManager::new(td.clone(), cleanup_ttl_hours) - }); - - new_registry.register( - "local", - Arc::new(infra::LocalFilesProvider::new(Arc::clone(&idx), lf_cfg, tm.clone())), - ); - - *state.local_index.write().await = Some(idx); - *state.transcode_manager.write().await = tm; - *state.sqlite_pool.write().await = Some(sqlite_pool); + } } _ => {} } diff --git a/k-tv-backend/api/src/routes/files.rs b/k-tv-backend/api/src/routes/files.rs index 4d3f595..2671549 100644 --- a/k-tv-backend/api/src/routes/files.rs +++ b/k-tv-backend/api/src/routes/files.rs @@ -269,15 +269,11 @@ async fn update_transcode_settings( CurrentUser(_user): CurrentUser, Json(req): Json, ) -> Result, ApiError> { - let pool = state.sqlite_pool.read().await.clone() - .ok_or_else(|| ApiError::not_implemented("sqlite not available"))?; - - let ttl = req.cleanup_ttl_hours as i64; - sqlx::query("UPDATE transcode_settings SET cleanup_ttl_hours = ? WHERE id = 1") - .bind(ttl) - .execute(&pool) - .await - .map_err(|e| ApiError::internal(e.to_string()))?; + if let Some(repo) = &state.transcode_settings_repo { + repo.save_cleanup_ttl(req.cleanup_ttl_hours) + .await + .map_err(|e| ApiError::internal(e.to_string()))?; + } let tm_opt = state.transcode_manager.read().await.clone(); if let Some(tm) = tm_opt { diff --git a/k-tv-backend/api/src/state.rs b/k-tv-backend/api/src/state.rs index a7c3f79..90df108 100644 --- a/k-tv-backend/api/src/state.rs +++ b/k-tv-backend/api/src/state.rs @@ -16,6 +16,9 @@ use crate::config::Config; use crate::events::EventBus; use crate::log_layer::LogLine; use domain::{ActivityLogRepository, ChannelService, ProviderConfigRepository, ScheduleEngineService, UserService}; +#[cfg(feature = "local-files")] +use domain::TranscodeSettingsRepository; +use k_core::db::DatabasePool; #[derive(Clone)] pub struct AppState { @@ -43,12 +46,11 @@ pub struct AppState { /// TranscodeManager for FFmpeg HLS transcoding (requires TRANSCODE_DIR). #[cfg(feature = "local-files")] pub transcode_manager: Arc>>>, - /// SQLite pool for transcode settings CRUD. + /// Repository for transcode settings persistence. #[cfg(feature = "local-files")] - pub sqlite_pool: Arc>>, - /// Raw sqlite pool — always present when running SQLite, used for local-files hot-reload. - #[cfg(feature = "local-files")] - pub raw_sqlite_pool: Option, + pub transcode_settings_repo: Option>, + /// Database pool — used by infra factory functions for hot-reload. + pub db_pool: Arc, } impl AppState { @@ -63,6 +65,9 @@ impl AppState { log_tx: broadcast::Sender, log_history: Arc>>, activity_log_repo: Arc, + db_pool: Arc, + #[cfg(feature = "local-files")] + transcode_settings_repo: Option>, ) -> anyhow::Result { let cookie_key = Key::derive_from(config.cookie_secret.as_bytes()); @@ -144,9 +149,8 @@ impl AppState { #[cfg(feature = "local-files")] transcode_manager: Arc::new(tokio::sync::RwLock::new(None)), #[cfg(feature = "local-files")] - sqlite_pool: Arc::new(tokio::sync::RwLock::new(None)), - #[cfg(feature = "local-files")] - raw_sqlite_pool: None, + transcode_settings_repo, + db_pool, }) } } diff --git a/k-tv-backend/domain/src/repositories.rs b/k-tv-backend/domain/src/repositories.rs index 433ee96..e8f0ede 100644 --- a/k-tv-backend/domain/src/repositories.rs +++ b/k-tv-backend/domain/src/repositories.rs @@ -111,3 +111,12 @@ pub trait ActivityLogRepository: Send + Sync { ) -> DomainResult<()>; async fn recent(&self, limit: u32) -> DomainResult>; } + +/// Repository port for transcode settings persistence. +#[async_trait] +pub trait TranscodeSettingsRepository: Send + Sync { + /// Load the persisted cleanup TTL. Returns None if no row exists yet. + async fn load_cleanup_ttl(&self) -> DomainResult>; + /// Persist the cleanup TTL (upsert — always row id=1). + async fn save_cleanup_ttl(&self, hours: u32) -> DomainResult<()>; +} diff --git a/k-tv-backend/infra/src/factory.rs b/k-tv-backend/infra/src/factory.rs index 3830469..c463789 100644 --- a/k-tv-backend/infra/src/factory.rs +++ b/k-tv-backend/infra/src/factory.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crate::db::DatabasePool; -use domain::{ActivityLogRepository, ChannelRepository, ProviderConfigRepository, ScheduleRepository, UserRepository}; +use domain::{ActivityLogRepository, ChannelRepository, ProviderConfigRepository, ScheduleRepository, TranscodeSettingsRepository, UserRepository}; #[derive(Debug, thiserror::Error)] pub enum FactoryError { @@ -103,3 +103,57 @@ pub async fn build_schedule_repository( )), } } + +pub async fn build_transcode_settings_repository( + pool: &DatabasePool, +) -> FactoryResult> { + match pool { + #[cfg(feature = "sqlite")] + DatabasePool::Sqlite(p) => Ok(Arc::new( + crate::transcode_settings_repository::SqliteTranscodeSettingsRepository::new(p.clone()), + )), + #[allow(unreachable_patterns)] + _ => Err(FactoryError::NotImplemented( + "TranscodeSettingsRepository not implemented for this database".to_string(), + )), + } +} + +#[cfg(feature = "local-files")] +pub struct LocalFilesBundle { + pub provider: Arc, + pub local_index: Arc, + pub transcode_manager: Option>, +} + +#[cfg(feature = "local-files")] +pub async fn build_local_files_bundle( + pool: &DatabasePool, + root_dir: std::path::PathBuf, + transcode_dir: Option, + cleanup_ttl_hours: u32, + base_url: String, +) -> FactoryResult { + match pool { + #[cfg(feature = "sqlite")] + DatabasePool::Sqlite(sqlite_pool) => { + let cfg = crate::LocalFilesConfig { + root_dir, + base_url, + transcode_dir: transcode_dir.clone(), + cleanup_ttl_hours, + }; + let idx = Arc::new(crate::LocalIndex::new(&cfg, sqlite_pool.clone()).await); + let tm = transcode_dir.as_ref().map(|td| { + std::fs::create_dir_all(td).ok(); + crate::TranscodeManager::new(td.clone(), cleanup_ttl_hours) + }); + let provider = Arc::new(crate::LocalFilesProvider::new(Arc::clone(&idx), cfg, tm.clone())); + Ok(LocalFilesBundle { provider, local_index: idx, transcode_manager: tm }) + } + #[allow(unreachable_patterns)] + _ => Err(FactoryError::NotImplemented( + "local-files requires SQLite".to_string(), + )), + } +} diff --git a/k-tv-backend/infra/src/lib.rs b/k-tv-backend/infra/src/lib.rs index 7562b6e..9cc669c 100644 --- a/k-tv-backend/infra/src/lib.rs +++ b/k-tv-backend/infra/src/lib.rs @@ -22,6 +22,7 @@ mod activity_log_repository; mod channel_repository; mod provider_config_repository; mod schedule_repository; +mod transcode_settings_repository; mod user_repository; #[cfg(feature = "local-files")] @@ -41,6 +42,10 @@ pub use channel_repository::SqliteChannelRepository; pub use provider_config_repository::SqliteProviderConfigRepository; #[cfg(feature = "sqlite")] pub use schedule_repository::SqliteScheduleRepository; +#[cfg(feature = "sqlite")] +pub use transcode_settings_repository::SqliteTranscodeSettingsRepository; + +pub use domain::TranscodeSettingsRepository; #[cfg(feature = "jellyfin")] pub use jellyfin::{JellyfinConfig, JellyfinMediaProvider}; diff --git a/k-tv-backend/infra/src/transcode_settings_repository/mod.rs b/k-tv-backend/infra/src/transcode_settings_repository/mod.rs new file mode 100644 index 0000000..27203c0 --- /dev/null +++ b/k-tv-backend/infra/src/transcode_settings_repository/mod.rs @@ -0,0 +1,4 @@ +#[cfg(feature = "sqlite")] +mod sqlite; +#[cfg(feature = "sqlite")] +pub use sqlite::SqliteTranscodeSettingsRepository; diff --git a/k-tv-backend/infra/src/transcode_settings_repository/sqlite.rs b/k-tv-backend/infra/src/transcode_settings_repository/sqlite.rs new file mode 100644 index 0000000..16fc572 --- /dev/null +++ b/k-tv-backend/infra/src/transcode_settings_repository/sqlite.rs @@ -0,0 +1,34 @@ +use async_trait::async_trait; +use domain::{DomainError, DomainResult, TranscodeSettingsRepository}; +use sqlx::SqlitePool; + +pub struct SqliteTranscodeSettingsRepository { + pool: SqlitePool, +} + +impl SqliteTranscodeSettingsRepository { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl TranscodeSettingsRepository for SqliteTranscodeSettingsRepository { + async fn load_cleanup_ttl(&self) -> DomainResult> { + let row: Option<(i64,)> = + sqlx::query_as("SELECT cleanup_ttl_hours FROM transcode_settings WHERE id = 1") + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + Ok(row.map(|(h,)| h as u32)) + } + + async fn save_cleanup_ttl(&self, hours: u32) -> DomainResult<()> { + sqlx::query("UPDATE transcode_settings SET cleanup_ttl_hours = ? WHERE id = 1") + .bind(hours as i64) + .execute(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + Ok(()) + } +}