//! SQLite implementation of ILibraryRepository. use async_trait::async_trait; use sqlx::SqlitePool; use domain::{ ContentType, DomainError, DomainResult, ILibraryRepository, LibraryCollection, LibraryItem, LibrarySearchFilter, LibrarySyncLogEntry, LibrarySyncResult, SeasonSummary, ShowSummary, }; pub struct SqliteLibraryRepository { pool: SqlitePool, } impl SqliteLibraryRepository { pub fn new(pool: SqlitePool) -> Self { Self { pool } } } fn content_type_str(ct: &ContentType) -> &'static str { match ct { ContentType::Movie => "movie", ContentType::Episode => "episode", ContentType::Short => "short", } } fn parse_content_type(s: &str) -> ContentType { match s { "episode" => ContentType::Episode, "short" => ContentType::Short, _ => ContentType::Movie, } } #[async_trait] impl ILibraryRepository for SqliteLibraryRepository { async fn search(&self, filter: &LibrarySearchFilter) -> DomainResult<(Vec, u32)> { let mut conditions: Vec = vec![]; if let Some(ref p) = filter.provider_id { conditions.push(format!("provider_id = '{}'", p.replace('\'', "''"))); } if let Some(ref ct) = filter.content_type { conditions.push(format!("content_type = '{}'", content_type_str(ct))); } if let Some(ref st) = filter.search_term { conditions.push(format!("title LIKE '%{}%'", st.replace('\'', "''"))); } if let Some(ref cid) = filter.collection_id { conditions.push(format!("collection_id = '{}'", cid.replace('\'', "''"))); } if let Some(decade) = filter.decade { let end = decade + 10; conditions.push(format!("year >= {} AND year < {}", decade, end)); } if let Some(min) = filter.min_duration_secs { conditions.push(format!("duration_secs >= {}", min)); } if let Some(max) = filter.max_duration_secs { conditions.push(format!("duration_secs <= {}", max)); } if !filter.series_names.is_empty() { let quoted: Vec = filter.series_names.iter() .map(|s| format!("'{}'", s.replace('\'', "''"))) .collect(); conditions.push(format!("series_name IN ({})", quoted.join(","))); } if !filter.genres.is_empty() { let genre_conditions: Vec = filter.genres.iter() .map(|g| format!("EXISTS (SELECT 1 FROM json_each(library_items.genres) WHERE value = '{}')", g.replace('\'', "''"))) .collect(); conditions.push(format!("({})", genre_conditions.join(" OR "))); } if let Some(sn) = filter.season_number { conditions.push(format!("season_number = {}", sn)); } let where_clause = if conditions.is_empty() { String::new() } else { format!("WHERE {}", conditions.join(" AND ")) }; let count_sql = format!("SELECT COUNT(*) FROM library_items {}", where_clause); let total: i64 = sqlx::query_scalar(&count_sql) .fetch_one(&self.pool) .await .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; let items_sql = format!( "SELECT * FROM library_items {} ORDER BY title ASC LIMIT {} OFFSET {}", where_clause, filter.limit, filter.offset ); let rows = sqlx::query_as::<_, LibraryItemRow>(&items_sql) .fetch_all(&self.pool) .await .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; Ok((rows.into_iter().map(Into::into).collect(), total as u32)) } async fn get_by_id(&self, id: &str) -> DomainResult> { let row = sqlx::query_as::<_, LibraryItemRow>( "SELECT * FROM library_items WHERE id = ?" ) .bind(id) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; Ok(row.map(Into::into)) } async fn list_collections(&self, provider_id: Option<&str>) -> DomainResult> { let rows: Vec<(String, Option, Option)> = if let Some(p) = provider_id { sqlx::query_as::<_, (String, Option, Option)>( "SELECT DISTINCT collection_id, collection_name, collection_type FROM library_items WHERE collection_id IS NOT NULL AND provider_id = ? ORDER BY collection_name ASC" ).bind(p).fetch_all(&self.pool).await } else { sqlx::query_as::<_, (String, Option, Option)>( "SELECT DISTINCT collection_id, collection_name, collection_type FROM library_items WHERE collection_id IS NOT NULL ORDER BY collection_name ASC" ).fetch_all(&self.pool).await }.map_err(|e| DomainError::InfrastructureError(e.to_string()))?; Ok(rows.into_iter().map(|(id, name, ct)| LibraryCollection { id, name: name.unwrap_or_default(), collection_type: ct, }).collect()) } async fn list_series(&self, provider_id: Option<&str>) -> DomainResult> { let rows: Vec<(String,)> = if let Some(p) = provider_id { sqlx::query_as( "SELECT DISTINCT series_name FROM library_items WHERE series_name IS NOT NULL AND provider_id = ? ORDER BY series_name ASC" ).bind(p).fetch_all(&self.pool).await } else { sqlx::query_as( "SELECT DISTINCT series_name FROM library_items WHERE series_name IS NOT NULL ORDER BY series_name ASC" ).fetch_all(&self.pool).await }.map_err(|e| DomainError::InfrastructureError(e.to_string()))?; Ok(rows.into_iter().map(|(s,)| s).collect()) } async fn list_genres(&self, content_type: Option<&ContentType>, provider_id: Option<&str>) -> DomainResult> { let sql = match (content_type, provider_id) { (Some(ct), Some(p)) => format!( "SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je WHERE li.content_type = '{}' AND li.provider_id = '{}' ORDER BY je.value ASC", content_type_str(ct), p.replace('\'', "''") ), (Some(ct), None) => format!( "SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je WHERE li.content_type = '{}' ORDER BY je.value ASC", content_type_str(ct) ), (None, Some(p)) => format!( "SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je WHERE li.provider_id = '{}' ORDER BY je.value ASC", p.replace('\'', "''") ), (None, None) => "SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je ORDER BY je.value ASC".to_string(), }; let rows: Vec<(String,)> = sqlx::query_as(&sql) .fetch_all(&self.pool) .await .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; Ok(rows.into_iter().map(|(s,)| s).collect()) } async fn upsert_items(&self, _provider_id: &str, items: Vec) -> DomainResult<()> { let mut tx = self.pool.begin().await.map_err(|e| DomainError::InfrastructureError(e.to_string()))?; for item in items { sqlx::query( "INSERT OR REPLACE INTO library_items (id, provider_id, external_id, title, content_type, duration_secs, series_name, season_number, episode_number, year, genres, tags, collection_id, collection_name, collection_type, thumbnail_url, synced_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" ) .bind(&item.id).bind(&item.provider_id).bind(&item.external_id) .bind(&item.title).bind(content_type_str(&item.content_type)) .bind(item.duration_secs as i64) .bind(&item.series_name).bind(item.season_number.map(|n| n as i64)) .bind(item.episode_number.map(|n| n as i64)) .bind(item.year.map(|n| n as i64)) .bind(serde_json::to_string(&item.genres).unwrap_or_default()) .bind(serde_json::to_string(&item.tags).unwrap_or_default()) .bind(&item.collection_id).bind(&item.collection_name) .bind(&item.collection_type).bind(&item.thumbnail_url) .bind(&item.synced_at) .execute(&mut *tx) .await .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; } tx.commit().await.map_err(|e| DomainError::InfrastructureError(e.to_string())) } async fn clear_provider(&self, provider_id: &str) -> DomainResult<()> { sqlx::query("DELETE FROM library_items WHERE provider_id = ?") .bind(provider_id) .execute(&self.pool) .await .map(|_| ()) .map_err(|e| DomainError::InfrastructureError(e.to_string())) } async fn log_sync_start(&self, provider_id: &str) -> DomainResult { let now = chrono::Utc::now().to_rfc3339(); let id = sqlx::query_scalar::<_, i64>( "INSERT INTO library_sync_log (provider_id, started_at, status) VALUES (?, ?, 'running') RETURNING id" ) .bind(provider_id).bind(&now) .fetch_one(&self.pool) .await .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; Ok(id) } async fn log_sync_finish(&self, log_id: i64, result: &LibrarySyncResult) -> DomainResult<()> { let now = chrono::Utc::now().to_rfc3339(); let status = if result.error.is_none() { "done" } else { "error" }; sqlx::query( "UPDATE library_sync_log SET finished_at = ?, items_found = ?, status = ?, error_msg = ? WHERE id = ?" ) .bind(&now).bind(result.items_found as i64) .bind(status).bind(&result.error).bind(log_id) .execute(&self.pool) .await .map(|_| ()) .map_err(|e| DomainError::InfrastructureError(e.to_string())) } async fn latest_sync_status(&self) -> DomainResult> { let rows = sqlx::query_as::<_, SyncLogRow>( "SELECT * FROM library_sync_log WHERE id IN ( SELECT MAX(id) FROM library_sync_log GROUP BY provider_id ) ORDER BY started_at DESC" ) .fetch_all(&self.pool) .await .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; Ok(rows.into_iter().map(|r| LibrarySyncLogEntry { id: r.id, provider_id: r.provider_id, started_at: r.started_at, finished_at: r.finished_at, items_found: r.items_found as u32, status: r.status, error_msg: r.error_msg, }).collect()) } async fn is_sync_running(&self, provider_id: &str) -> DomainResult { let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM library_sync_log WHERE provider_id = ? AND status = 'running'" ) .bind(provider_id) .fetch_one(&self.pool) .await .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; Ok(count > 0) } } // ── SQLx row types ───────────────────────────────────────────────────────── #[derive(sqlx::FromRow)] struct LibraryItemRow { id: String, provider_id: String, external_id: String, title: String, content_type: String, duration_secs: i64, series_name: Option, season_number: Option, episode_number: Option, year: Option, genres: String, tags: String, collection_id: Option, collection_name: Option, collection_type: Option, thumbnail_url: Option, synced_at: String, } impl From for LibraryItem { fn from(r: LibraryItemRow) -> Self { Self { id: r.id, provider_id: r.provider_id, external_id: r.external_id, title: r.title, content_type: parse_content_type(&r.content_type), duration_secs: r.duration_secs as u32, series_name: r.series_name, season_number: r.season_number.map(|n| n as u32), episode_number: r.episode_number.map(|n| n as u32), year: r.year.map(|n| n as u16), genres: serde_json::from_str(&r.genres).unwrap_or_default(), tags: serde_json::from_str(&r.tags).unwrap_or_default(), collection_id: r.collection_id, collection_name: r.collection_name, collection_type: r.collection_type, thumbnail_url: r.thumbnail_url, synced_at: r.synced_at, } } } #[derive(sqlx::FromRow)] struct SyncLogRow { id: i64, provider_id: String, started_at: String, finished_at: Option, items_found: i64, status: String, error_msg: Option, } #[cfg(test)] mod tests { use super::*; use sqlx::SqlitePool; use domain::{LibraryItem, LibrarySearchFilter, ContentType}; async fn setup() -> SqlitePool { let pool = SqlitePool::connect(":memory:").await.unwrap(); sqlx::query( "CREATE TABLE library_items ( id TEXT PRIMARY KEY, provider_id TEXT NOT NULL, external_id TEXT NOT NULL, title TEXT NOT NULL, content_type TEXT NOT NULL, duration_secs INTEGER NOT NULL DEFAULT 0, series_name TEXT, season_number INTEGER, episode_number INTEGER, year INTEGER, genres TEXT NOT NULL DEFAULT '[]', tags TEXT NOT NULL DEFAULT '[]', collection_id TEXT, collection_name TEXT, collection_type TEXT, thumbnail_url TEXT, synced_at TEXT NOT NULL )" ).execute(&pool).await.unwrap(); sqlx::query( "CREATE TABLE library_sync_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, provider_id TEXT NOT NULL, started_at TEXT NOT NULL, finished_at TEXT, items_found INTEGER NOT NULL DEFAULT 0, status TEXT NOT NULL DEFAULT 'running', error_msg TEXT )" ).execute(&pool).await.unwrap(); pool } fn make_item(id: &str, provider: &str, title: &str) -> LibraryItem { LibraryItem { id: id.to_string(), provider_id: provider.to_string(), external_id: id.to_string(), title: title.to_string(), content_type: ContentType::Movie, duration_secs: 3600, series_name: None, season_number: None, episode_number: None, year: Some(2020), genres: vec!["Action".to_string()], tags: vec![], collection_id: None, collection_name: None, collection_type: None, thumbnail_url: None, synced_at: "2026-03-19T00:00:00Z".to_string(), } } #[tokio::test] async fn upsert_then_search_returns_items() { let pool = setup().await; let repo = SqliteLibraryRepository::new(pool); let items = vec![make_item("jellyfin::1", "jellyfin", "Movie A")]; repo.upsert_items("jellyfin", items).await.unwrap(); let (results, total) = repo.search(&LibrarySearchFilter { limit: 50, ..Default::default() }).await.unwrap(); assert_eq!(total, 1); assert_eq!(results[0].title, "Movie A"); } #[tokio::test] async fn clear_provider_removes_only_that_provider() { let pool = setup().await; let repo = SqliteLibraryRepository::new(pool); repo.upsert_items("jellyfin", vec![make_item("jellyfin::1", "jellyfin", "Jelly Movie")]).await.unwrap(); repo.upsert_items("local", vec![make_item("local::1", "local", "Local Movie")]).await.unwrap(); repo.clear_provider("jellyfin").await.unwrap(); let (results, _) = repo.search(&LibrarySearchFilter { limit: 50, ..Default::default() }).await.unwrap(); assert_eq!(results.len(), 1); assert_eq!(results[0].provider_id, "local"); } #[tokio::test] async fn is_sync_running_reflects_status() { let pool = setup().await; let repo = SqliteLibraryRepository::new(pool); assert!(!repo.is_sync_running("jellyfin").await.unwrap()); let log_id = repo.log_sync_start("jellyfin").await.unwrap(); assert!(repo.is_sync_running("jellyfin").await.unwrap()); let result = domain::LibrarySyncResult { provider_id: "jellyfin".to_string(), items_found: 5, duration_ms: 100, error: None, }; repo.log_sync_finish(log_id, &result).await.unwrap(); assert!(!repo.is_sync_running("jellyfin").await.unwrap()); } }