From a3a421c0ac5280c9aec5225021ab374d8f863b0e Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 20 Mar 2026 00:15:01 +0100 Subject: [PATCH] feat(infra): add SqliteLibraryRepository --- k-tv-backend/infra/src/lib.rs | 3 + k-tv-backend/infra/src/library_repository.rs | 387 +++++++++++++++++++ 2 files changed, 390 insertions(+) create mode 100644 k-tv-backend/infra/src/library_repository.rs diff --git a/k-tv-backend/infra/src/lib.rs b/k-tv-backend/infra/src/lib.rs index 9cc669c..b915328 100644 --- a/k-tv-backend/infra/src/lib.rs +++ b/k-tv-backend/infra/src/lib.rs @@ -20,6 +20,7 @@ pub mod jellyfin; pub mod provider_registry; mod activity_log_repository; mod channel_repository; +mod library_repository; mod provider_config_repository; mod schedule_repository; mod transcode_settings_repository; @@ -44,6 +45,8 @@ pub use provider_config_repository::SqliteProviderConfigRepository; pub use schedule_repository::SqliteScheduleRepository; #[cfg(feature = "sqlite")] pub use transcode_settings_repository::SqliteTranscodeSettingsRepository; +#[cfg(feature = "sqlite")] +pub use library_repository::SqliteLibraryRepository; pub use domain::TranscodeSettingsRepository; diff --git a/k-tv-backend/infra/src/library_repository.rs b/k-tv-backend/infra/src/library_repository.rs new file mode 100644 index 0000000..9da4925 --- /dev/null +++ b/k-tv-backend/infra/src/library_repository.rs @@ -0,0 +1,387 @@ +//! SQLite implementation of ILibraryRepository. + +use async_trait::async_trait; +use sqlx::SqlitePool; + +use domain::{ + ContentType, DomainError, DomainResult, ILibraryRepository, + LibraryCollection, LibraryItem, LibrarySearchFilter, LibrarySyncLogEntry, LibrarySyncResult, +}; + +pub struct SqliteLibraryRepository { + pool: SqlitePool, +} + +impl SqliteLibraryRepository { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +fn content_type_str(ct: &ContentType) -> &'static str { + match ct { + ContentType::Movie => "movie", + ContentType::Episode => "episode", + ContentType::Short => "short", + } +} + +fn parse_content_type(s: &str) -> ContentType { + match s { + "episode" => ContentType::Episode, + "short" => ContentType::Short, + _ => ContentType::Movie, + } +} + +#[async_trait] +impl ILibraryRepository for SqliteLibraryRepository { + async fn search(&self, filter: &LibrarySearchFilter) -> DomainResult<(Vec, u32)> { + let mut conditions: Vec = vec![]; + if let Some(ref p) = filter.provider_id { + conditions.push(format!("provider_id = '{}'", p.replace('\'', "''"))); + } + if let Some(ref ct) = filter.content_type { + conditions.push(format!("content_type = '{}'", content_type_str(ct))); + } + if let Some(ref st) = filter.search_term { + conditions.push(format!("title LIKE '%{}%'", st.replace('\'', "''"))); + } + if let Some(ref cid) = filter.collection_id { + conditions.push(format!("collection_id = '{}'", cid.replace('\'', "''"))); + } + if let Some(decade) = filter.decade { + let end = decade + 10; + conditions.push(format!("year >= {} AND year < {}", decade, end)); + } + if let Some(min) = filter.min_duration_secs { + conditions.push(format!("duration_secs >= {}", min)); + } + if let Some(max) = filter.max_duration_secs { + conditions.push(format!("duration_secs <= {}", max)); + } + if !filter.series_names.is_empty() { + let quoted: Vec = filter.series_names.iter() + .map(|s| format!("'{}'", s.replace('\'', "''"))) + .collect(); + conditions.push(format!("series_name IN ({})", quoted.join(","))); + } + if !filter.genres.is_empty() { + let genre_conditions: Vec = filter.genres.iter() + .map(|g| format!("EXISTS (SELECT 1 FROM json_each(library_items.genres) WHERE value = '{}')", g.replace('\'', "''"))) + .collect(); + conditions.push(format!("({})", genre_conditions.join(" OR "))); + } + + let where_clause = if conditions.is_empty() { + String::new() + } else { + format!("WHERE {}", conditions.join(" AND ")) + }; + + let count_sql = format!("SELECT COUNT(*) FROM library_items {}", where_clause); + let total: i64 = sqlx::query_scalar(&count_sql) + .fetch_one(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + + let items_sql = format!( + "SELECT * FROM library_items {} ORDER BY title ASC LIMIT {} OFFSET {}", + where_clause, filter.limit, filter.offset + ); + + let rows = sqlx::query_as::<_, LibraryItemRow>(&items_sql) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + + Ok((rows.into_iter().map(Into::into).collect(), total as u32)) + } + + async fn get_by_id(&self, id: &str) -> DomainResult> { + let row = sqlx::query_as::<_, LibraryItemRow>( + "SELECT * FROM library_items WHERE id = ?" + ) + .bind(id) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + Ok(row.map(Into::into)) + } + + async fn list_collections(&self, provider_id: Option<&str>) -> DomainResult> { + let rows: Vec<(String, Option, Option)> = if let Some(p) = provider_id { + sqlx::query_as::<_, (String, Option, Option)>( + "SELECT DISTINCT collection_id, collection_name, collection_type + FROM library_items WHERE collection_id IS NOT NULL AND provider_id = ? + ORDER BY collection_name ASC" + ).bind(p).fetch_all(&self.pool).await + } else { + sqlx::query_as::<_, (String, Option, Option)>( + "SELECT DISTINCT collection_id, collection_name, collection_type + FROM library_items WHERE collection_id IS NOT NULL + ORDER BY collection_name ASC" + ).fetch_all(&self.pool).await + }.map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + + Ok(rows.into_iter().map(|(id, name, ct)| LibraryCollection { + id, + name: name.unwrap_or_default(), + collection_type: ct, + }).collect()) + } + + async fn list_series(&self, provider_id: Option<&str>) -> DomainResult> { + let rows: Vec<(String,)> = if let Some(p) = provider_id { + sqlx::query_as( + "SELECT DISTINCT series_name FROM library_items + WHERE series_name IS NOT NULL AND provider_id = ? ORDER BY series_name ASC" + ).bind(p).fetch_all(&self.pool).await + } else { + sqlx::query_as( + "SELECT DISTINCT series_name FROM library_items + WHERE series_name IS NOT NULL ORDER BY series_name ASC" + ).fetch_all(&self.pool).await + }.map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + + Ok(rows.into_iter().map(|(s,)| s).collect()) + } + + async fn list_genres(&self, content_type: Option<&ContentType>, provider_id: Option<&str>) -> DomainResult> { + let sql = match (content_type, provider_id) { + (Some(ct), Some(p)) => format!( + "SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je + WHERE li.content_type = '{}' AND li.provider_id = '{}' ORDER BY je.value ASC", + content_type_str(ct), p.replace('\'', "''") + ), + (Some(ct), None) => format!( + "SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je + WHERE li.content_type = '{}' ORDER BY je.value ASC", + content_type_str(ct) + ), + (None, Some(p)) => format!( + "SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je + WHERE li.provider_id = '{}' ORDER BY je.value ASC", + p.replace('\'', "''") + ), + (None, None) => "SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je ORDER BY je.value ASC".to_string(), + }; + let rows: Vec<(String,)> = sqlx::query_as(&sql) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + Ok(rows.into_iter().map(|(s,)| s).collect()) + } + + async fn upsert_items(&self, _provider_id: &str, items: Vec) -> DomainResult<()> { + let mut tx = self.pool.begin().await.map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + for item in items { + sqlx::query( + "INSERT OR REPLACE INTO library_items + (id, provider_id, external_id, title, content_type, duration_secs, + series_name, season_number, episode_number, year, genres, tags, + collection_id, collection_name, collection_type, thumbnail_url, synced_at) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" + ) + .bind(&item.id).bind(&item.provider_id).bind(&item.external_id) + .bind(&item.title).bind(content_type_str(&item.content_type)) + .bind(item.duration_secs as i64) + .bind(&item.series_name).bind(item.season_number.map(|n| n as i64)) + .bind(item.episode_number.map(|n| n as i64)) + .bind(item.year.map(|n| n as i64)) + .bind(serde_json::to_string(&item.genres).unwrap_or_default()) + .bind(serde_json::to_string(&item.tags).unwrap_or_default()) + .bind(&item.collection_id).bind(&item.collection_name) + .bind(&item.collection_type).bind(&item.thumbnail_url) + .bind(&item.synced_at) + .execute(&mut *tx) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + } + tx.commit().await.map_err(|e| DomainError::InfrastructureError(e.to_string())) + } + + async fn clear_provider(&self, provider_id: &str) -> DomainResult<()> { + sqlx::query("DELETE FROM library_items WHERE provider_id = ?") + .bind(provider_id) + .execute(&self.pool) + .await + .map(|_| ()) + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } + + async fn log_sync_start(&self, provider_id: &str) -> DomainResult { + let now = chrono::Utc::now().to_rfc3339(); + let id = sqlx::query_scalar::<_, i64>( + "INSERT INTO library_sync_log (provider_id, started_at, status) + VALUES (?, ?, 'running') RETURNING id" + ) + .bind(provider_id).bind(&now) + .fetch_one(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + Ok(id) + } + + async fn log_sync_finish(&self, log_id: i64, result: &LibrarySyncResult) -> DomainResult<()> { + let now = chrono::Utc::now().to_rfc3339(); + let status = if result.error.is_none() { "done" } else { "error" }; + sqlx::query( + "UPDATE library_sync_log + SET finished_at = ?, items_found = ?, status = ?, error_msg = ? + WHERE id = ?" + ) + .bind(&now).bind(result.items_found as i64) + .bind(status).bind(&result.error).bind(log_id) + .execute(&self.pool) + .await + .map(|_| ()) + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } + + async fn latest_sync_status(&self) -> DomainResult> { + let rows = sqlx::query_as::<_, SyncLogRow>( + "SELECT * FROM library_sync_log + WHERE id IN ( + SELECT MAX(id) FROM library_sync_log GROUP BY provider_id + ) + ORDER BY started_at DESC" + ) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + + Ok(rows.into_iter().map(|r| LibrarySyncLogEntry { + id: r.id, provider_id: r.provider_id, started_at: r.started_at, + finished_at: r.finished_at, items_found: r.items_found as u32, + status: r.status, error_msg: r.error_msg, + }).collect()) + } + + async fn is_sync_running(&self, provider_id: &str) -> DomainResult { + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM library_sync_log WHERE provider_id = ? AND status = 'running'" + ) + .bind(provider_id) + .fetch_one(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + Ok(count > 0) + } +} + +// ── SQLx row types ───────────────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct LibraryItemRow { + id: String, provider_id: String, external_id: String, title: String, + content_type: String, duration_secs: i64, + series_name: Option, season_number: Option, episode_number: Option, + year: Option, genres: String, tags: String, + collection_id: Option, collection_name: Option, collection_type: Option, + thumbnail_url: Option, synced_at: String, +} + +impl From for LibraryItem { + fn from(r: LibraryItemRow) -> Self { + Self { + id: r.id, provider_id: r.provider_id, external_id: r.external_id, + title: r.title, content_type: parse_content_type(&r.content_type), + duration_secs: r.duration_secs as u32, + series_name: r.series_name, + season_number: r.season_number.map(|n| n as u32), + episode_number: r.episode_number.map(|n| n as u32), + year: r.year.map(|n| n as u16), + genres: serde_json::from_str(&r.genres).unwrap_or_default(), + tags: serde_json::from_str(&r.tags).unwrap_or_default(), + collection_id: r.collection_id, collection_name: r.collection_name, + collection_type: r.collection_type, thumbnail_url: r.thumbnail_url, + synced_at: r.synced_at, + } + } +} + +#[derive(sqlx::FromRow)] +struct SyncLogRow { + id: i64, provider_id: String, started_at: String, finished_at: Option, + items_found: i64, status: String, error_msg: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + use sqlx::SqlitePool; + use domain::{LibraryItem, LibrarySearchFilter, ContentType}; + + async fn setup() -> SqlitePool { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + sqlx::query( + "CREATE TABLE library_items ( + id TEXT PRIMARY KEY, provider_id TEXT NOT NULL, external_id TEXT NOT NULL, + title TEXT NOT NULL, content_type TEXT NOT NULL, duration_secs INTEGER NOT NULL DEFAULT 0, + series_name TEXT, season_number INTEGER, episode_number INTEGER, year INTEGER, + genres TEXT NOT NULL DEFAULT '[]', tags TEXT NOT NULL DEFAULT '[]', + collection_id TEXT, collection_name TEXT, collection_type TEXT, + thumbnail_url TEXT, synced_at TEXT NOT NULL + )" + ).execute(&pool).await.unwrap(); + sqlx::query( + "CREATE TABLE library_sync_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, provider_id TEXT NOT NULL, + started_at TEXT NOT NULL, finished_at TEXT, items_found INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'running', error_msg TEXT + )" + ).execute(&pool).await.unwrap(); + pool + } + + fn make_item(id: &str, provider: &str, title: &str) -> LibraryItem { + LibraryItem { + id: id.to_string(), provider_id: provider.to_string(), external_id: id.to_string(), + title: title.to_string(), content_type: ContentType::Movie, + duration_secs: 3600, series_name: None, season_number: None, episode_number: None, + year: Some(2020), genres: vec!["Action".to_string()], tags: vec![], + collection_id: None, collection_name: None, collection_type: None, + thumbnail_url: None, synced_at: "2026-03-19T00:00:00Z".to_string(), + } + } + + #[tokio::test] + async fn upsert_then_search_returns_items() { + let pool = setup().await; + let repo = SqliteLibraryRepository::new(pool); + let items = vec![make_item("jellyfin::1", "jellyfin", "Movie A")]; + repo.upsert_items("jellyfin", items).await.unwrap(); + + let (results, total) = repo.search(&LibrarySearchFilter { limit: 50, ..Default::default() }).await.unwrap(); + assert_eq!(total, 1); + assert_eq!(results[0].title, "Movie A"); + } + + #[tokio::test] + async fn clear_provider_removes_only_that_provider() { + let pool = setup().await; + let repo = SqliteLibraryRepository::new(pool); + repo.upsert_items("jellyfin", vec![make_item("jellyfin::1", "jellyfin", "Jelly Movie")]).await.unwrap(); + repo.upsert_items("local", vec![make_item("local::1", "local", "Local Movie")]).await.unwrap(); + repo.clear_provider("jellyfin").await.unwrap(); + + let (results, _) = repo.search(&LibrarySearchFilter { limit: 50, ..Default::default() }).await.unwrap(); + assert_eq!(results.len(), 1); + assert_eq!(results[0].provider_id, "local"); + } + + #[tokio::test] + async fn is_sync_running_reflects_status() { + let pool = setup().await; + let repo = SqliteLibraryRepository::new(pool); + assert!(!repo.is_sync_running("jellyfin").await.unwrap()); + let log_id = repo.log_sync_start("jellyfin").await.unwrap(); + assert!(repo.is_sync_running("jellyfin").await.unwrap()); + let result = domain::LibrarySyncResult { + provider_id: "jellyfin".to_string(), items_found: 5, duration_ms: 100, error: None, + }; + repo.log_sync_finish(log_id, &result).await.unwrap(); + assert!(!repo.is_sync_running("jellyfin").await.unwrap()); + } +}