feat(infra): add SqliteLibraryRepository

This commit is contained in:
2026-03-20 00:15:01 +01:00
parent c6c93766c7
commit a3a421c0ac
2 changed files with 390 additions and 0 deletions

View File

@@ -20,6 +20,7 @@ pub mod jellyfin;
pub mod provider_registry;
mod activity_log_repository;
mod channel_repository;
mod library_repository;
mod provider_config_repository;
mod schedule_repository;
mod transcode_settings_repository;
@@ -44,6 +45,8 @@ pub use provider_config_repository::SqliteProviderConfigRepository;
pub use schedule_repository::SqliteScheduleRepository;
#[cfg(feature = "sqlite")]
pub use transcode_settings_repository::SqliteTranscodeSettingsRepository;
#[cfg(feature = "sqlite")]
pub use library_repository::SqliteLibraryRepository;
pub use domain::TranscodeSettingsRepository;

View File

@@ -0,0 +1,387 @@
//! SQLite implementation of ILibraryRepository.
use async_trait::async_trait;
use sqlx::SqlitePool;
use domain::{
ContentType, DomainError, DomainResult, ILibraryRepository,
LibraryCollection, LibraryItem, LibrarySearchFilter, LibrarySyncLogEntry, LibrarySyncResult,
};
pub struct SqliteLibraryRepository {
pool: SqlitePool,
}
impl SqliteLibraryRepository {
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
}
fn content_type_str(ct: &ContentType) -> &'static str {
match ct {
ContentType::Movie => "movie",
ContentType::Episode => "episode",
ContentType::Short => "short",
}
}
fn parse_content_type(s: &str) -> ContentType {
match s {
"episode" => ContentType::Episode,
"short" => ContentType::Short,
_ => ContentType::Movie,
}
}
#[async_trait]
impl ILibraryRepository for SqliteLibraryRepository {
async fn search(&self, filter: &LibrarySearchFilter) -> DomainResult<(Vec<LibraryItem>, u32)> {
let mut conditions: Vec<String> = vec![];
if let Some(ref p) = filter.provider_id {
conditions.push(format!("provider_id = '{}'", p.replace('\'', "''")));
}
if let Some(ref ct) = filter.content_type {
conditions.push(format!("content_type = '{}'", content_type_str(ct)));
}
if let Some(ref st) = filter.search_term {
conditions.push(format!("title LIKE '%{}%'", st.replace('\'', "''")));
}
if let Some(ref cid) = filter.collection_id {
conditions.push(format!("collection_id = '{}'", cid.replace('\'', "''")));
}
if let Some(decade) = filter.decade {
let end = decade + 10;
conditions.push(format!("year >= {} AND year < {}", decade, end));
}
if let Some(min) = filter.min_duration_secs {
conditions.push(format!("duration_secs >= {}", min));
}
if let Some(max) = filter.max_duration_secs {
conditions.push(format!("duration_secs <= {}", max));
}
if !filter.series_names.is_empty() {
let quoted: Vec<String> = filter.series_names.iter()
.map(|s| format!("'{}'", s.replace('\'', "''")))
.collect();
conditions.push(format!("series_name IN ({})", quoted.join(",")));
}
if !filter.genres.is_empty() {
let genre_conditions: Vec<String> = filter.genres.iter()
.map(|g| format!("EXISTS (SELECT 1 FROM json_each(library_items.genres) WHERE value = '{}')", g.replace('\'', "''")))
.collect();
conditions.push(format!("({})", genre_conditions.join(" OR ")));
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!("WHERE {}", conditions.join(" AND "))
};
let count_sql = format!("SELECT COUNT(*) FROM library_items {}", where_clause);
let total: i64 = sqlx::query_scalar(&count_sql)
.fetch_one(&self.pool)
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
let items_sql = format!(
"SELECT * FROM library_items {} ORDER BY title ASC LIMIT {} OFFSET {}",
where_clause, filter.limit, filter.offset
);
let rows = sqlx::query_as::<_, LibraryItemRow>(&items_sql)
.fetch_all(&self.pool)
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
Ok((rows.into_iter().map(Into::into).collect(), total as u32))
}
async fn get_by_id(&self, id: &str) -> DomainResult<Option<LibraryItem>> {
let row = sqlx::query_as::<_, LibraryItemRow>(
"SELECT * FROM library_items WHERE id = ?"
)
.bind(id)
.fetch_optional(&self.pool)
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
Ok(row.map(Into::into))
}
async fn list_collections(&self, provider_id: Option<&str>) -> DomainResult<Vec<LibraryCollection>> {
let rows: Vec<(String, Option<String>, Option<String>)> = if let Some(p) = provider_id {
sqlx::query_as::<_, (String, Option<String>, Option<String>)>(
"SELECT DISTINCT collection_id, collection_name, collection_type
FROM library_items WHERE collection_id IS NOT NULL AND provider_id = ?
ORDER BY collection_name ASC"
).bind(p).fetch_all(&self.pool).await
} else {
sqlx::query_as::<_, (String, Option<String>, Option<String>)>(
"SELECT DISTINCT collection_id, collection_name, collection_type
FROM library_items WHERE collection_id IS NOT NULL
ORDER BY collection_name ASC"
).fetch_all(&self.pool).await
}.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
Ok(rows.into_iter().map(|(id, name, ct)| LibraryCollection {
id,
name: name.unwrap_or_default(),
collection_type: ct,
}).collect())
}
async fn list_series(&self, provider_id: Option<&str>) -> DomainResult<Vec<String>> {
let rows: Vec<(String,)> = if let Some(p) = provider_id {
sqlx::query_as(
"SELECT DISTINCT series_name FROM library_items
WHERE series_name IS NOT NULL AND provider_id = ? ORDER BY series_name ASC"
).bind(p).fetch_all(&self.pool).await
} else {
sqlx::query_as(
"SELECT DISTINCT series_name FROM library_items
WHERE series_name IS NOT NULL ORDER BY series_name ASC"
).fetch_all(&self.pool).await
}.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
Ok(rows.into_iter().map(|(s,)| s).collect())
}
async fn list_genres(&self, content_type: Option<&ContentType>, provider_id: Option<&str>) -> DomainResult<Vec<String>> {
let sql = match (content_type, provider_id) {
(Some(ct), Some(p)) => format!(
"SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je
WHERE li.content_type = '{}' AND li.provider_id = '{}' ORDER BY je.value ASC",
content_type_str(ct), p.replace('\'', "''")
),
(Some(ct), None) => format!(
"SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je
WHERE li.content_type = '{}' ORDER BY je.value ASC",
content_type_str(ct)
),
(None, Some(p)) => format!(
"SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je
WHERE li.provider_id = '{}' ORDER BY je.value ASC",
p.replace('\'', "''")
),
(None, None) => "SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je ORDER BY je.value ASC".to_string(),
};
let rows: Vec<(String,)> = sqlx::query_as(&sql)
.fetch_all(&self.pool)
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
Ok(rows.into_iter().map(|(s,)| s).collect())
}
async fn upsert_items(&self, _provider_id: &str, items: Vec<LibraryItem>) -> DomainResult<()> {
let mut tx = self.pool.begin().await.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
for item in items {
sqlx::query(
"INSERT OR REPLACE INTO library_items
(id, provider_id, external_id, title, content_type, duration_secs,
series_name, season_number, episode_number, year, genres, tags,
collection_id, collection_name, collection_type, thumbnail_url, synced_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
)
.bind(&item.id).bind(&item.provider_id).bind(&item.external_id)
.bind(&item.title).bind(content_type_str(&item.content_type))
.bind(item.duration_secs as i64)
.bind(&item.series_name).bind(item.season_number.map(|n| n as i64))
.bind(item.episode_number.map(|n| n as i64))
.bind(item.year.map(|n| n as i64))
.bind(serde_json::to_string(&item.genres).unwrap_or_default())
.bind(serde_json::to_string(&item.tags).unwrap_or_default())
.bind(&item.collection_id).bind(&item.collection_name)
.bind(&item.collection_type).bind(&item.thumbnail_url)
.bind(&item.synced_at)
.execute(&mut *tx)
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
}
tx.commit().await.map_err(|e| DomainError::InfrastructureError(e.to_string()))
}
async fn clear_provider(&self, provider_id: &str) -> DomainResult<()> {
sqlx::query("DELETE FROM library_items WHERE provider_id = ?")
.bind(provider_id)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
}
async fn log_sync_start(&self, provider_id: &str) -> DomainResult<i64> {
let now = chrono::Utc::now().to_rfc3339();
let id = sqlx::query_scalar::<_, i64>(
"INSERT INTO library_sync_log (provider_id, started_at, status)
VALUES (?, ?, 'running') RETURNING id"
)
.bind(provider_id).bind(&now)
.fetch_one(&self.pool)
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
Ok(id)
}
async fn log_sync_finish(&self, log_id: i64, result: &LibrarySyncResult) -> DomainResult<()> {
let now = chrono::Utc::now().to_rfc3339();
let status = if result.error.is_none() { "done" } else { "error" };
sqlx::query(
"UPDATE library_sync_log
SET finished_at = ?, items_found = ?, status = ?, error_msg = ?
WHERE id = ?"
)
.bind(&now).bind(result.items_found as i64)
.bind(status).bind(&result.error).bind(log_id)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
}
async fn latest_sync_status(&self) -> DomainResult<Vec<LibrarySyncLogEntry>> {
let rows = sqlx::query_as::<_, SyncLogRow>(
"SELECT * FROM library_sync_log
WHERE id IN (
SELECT MAX(id) FROM library_sync_log GROUP BY provider_id
)
ORDER BY started_at DESC"
)
.fetch_all(&self.pool)
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
Ok(rows.into_iter().map(|r| LibrarySyncLogEntry {
id: r.id, provider_id: r.provider_id, started_at: r.started_at,
finished_at: r.finished_at, items_found: r.items_found as u32,
status: r.status, error_msg: r.error_msg,
}).collect())
}
async fn is_sync_running(&self, provider_id: &str) -> DomainResult<bool> {
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM library_sync_log WHERE provider_id = ? AND status = 'running'"
)
.bind(provider_id)
.fetch_one(&self.pool)
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
Ok(count > 0)
}
}
// ── SQLx row types ─────────────────────────────────────────────────────────
#[derive(sqlx::FromRow)]
struct LibraryItemRow {
id: String, provider_id: String, external_id: String, title: String,
content_type: String, duration_secs: i64,
series_name: Option<String>, season_number: Option<i64>, episode_number: Option<i64>,
year: Option<i64>, genres: String, tags: String,
collection_id: Option<String>, collection_name: Option<String>, collection_type: Option<String>,
thumbnail_url: Option<String>, synced_at: String,
}
impl From<LibraryItemRow> for LibraryItem {
fn from(r: LibraryItemRow) -> Self {
Self {
id: r.id, provider_id: r.provider_id, external_id: r.external_id,
title: r.title, content_type: parse_content_type(&r.content_type),
duration_secs: r.duration_secs as u32,
series_name: r.series_name,
season_number: r.season_number.map(|n| n as u32),
episode_number: r.episode_number.map(|n| n as u32),
year: r.year.map(|n| n as u16),
genres: serde_json::from_str(&r.genres).unwrap_or_default(),
tags: serde_json::from_str(&r.tags).unwrap_or_default(),
collection_id: r.collection_id, collection_name: r.collection_name,
collection_type: r.collection_type, thumbnail_url: r.thumbnail_url,
synced_at: r.synced_at,
}
}
}
#[derive(sqlx::FromRow)]
struct SyncLogRow {
id: i64, provider_id: String, started_at: String, finished_at: Option<String>,
items_found: i64, status: String, error_msg: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::SqlitePool;
use domain::{LibraryItem, LibrarySearchFilter, ContentType};
async fn setup() -> SqlitePool {
let pool = SqlitePool::connect(":memory:").await.unwrap();
sqlx::query(
"CREATE TABLE library_items (
id TEXT PRIMARY KEY, provider_id TEXT NOT NULL, external_id TEXT NOT NULL,
title TEXT NOT NULL, content_type TEXT NOT NULL, duration_secs INTEGER NOT NULL DEFAULT 0,
series_name TEXT, season_number INTEGER, episode_number INTEGER, year INTEGER,
genres TEXT NOT NULL DEFAULT '[]', tags TEXT NOT NULL DEFAULT '[]',
collection_id TEXT, collection_name TEXT, collection_type TEXT,
thumbnail_url TEXT, synced_at TEXT NOT NULL
)"
).execute(&pool).await.unwrap();
sqlx::query(
"CREATE TABLE library_sync_log (
id INTEGER PRIMARY KEY AUTOINCREMENT, provider_id TEXT NOT NULL,
started_at TEXT NOT NULL, finished_at TEXT, items_found INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'running', error_msg TEXT
)"
).execute(&pool).await.unwrap();
pool
}
fn make_item(id: &str, provider: &str, title: &str) -> LibraryItem {
LibraryItem {
id: id.to_string(), provider_id: provider.to_string(), external_id: id.to_string(),
title: title.to_string(), content_type: ContentType::Movie,
duration_secs: 3600, series_name: None, season_number: None, episode_number: None,
year: Some(2020), genres: vec!["Action".to_string()], tags: vec![],
collection_id: None, collection_name: None, collection_type: None,
thumbnail_url: None, synced_at: "2026-03-19T00:00:00Z".to_string(),
}
}
#[tokio::test]
async fn upsert_then_search_returns_items() {
let pool = setup().await;
let repo = SqliteLibraryRepository::new(pool);
let items = vec![make_item("jellyfin::1", "jellyfin", "Movie A")];
repo.upsert_items("jellyfin", items).await.unwrap();
let (results, total) = repo.search(&LibrarySearchFilter { limit: 50, ..Default::default() }).await.unwrap();
assert_eq!(total, 1);
assert_eq!(results[0].title, "Movie A");
}
#[tokio::test]
async fn clear_provider_removes_only_that_provider() {
let pool = setup().await;
let repo = SqliteLibraryRepository::new(pool);
repo.upsert_items("jellyfin", vec![make_item("jellyfin::1", "jellyfin", "Jelly Movie")]).await.unwrap();
repo.upsert_items("local", vec![make_item("local::1", "local", "Local Movie")]).await.unwrap();
repo.clear_provider("jellyfin").await.unwrap();
let (results, _) = repo.search(&LibrarySearchFilter { limit: 50, ..Default::default() }).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].provider_id, "local");
}
#[tokio::test]
async fn is_sync_running_reflects_status() {
let pool = setup().await;
let repo = SqliteLibraryRepository::new(pool);
assert!(!repo.is_sync_running("jellyfin").await.unwrap());
let log_id = repo.log_sync_start("jellyfin").await.unwrap();
assert!(repo.is_sync_running("jellyfin").await.unwrap());
let result = domain::LibrarySyncResult {
provider_id: "jellyfin".to_string(), items_found: 5, duration_ms: 100, error: None,
};
repo.log_sync_finish(log_id, &result).await.unwrap();
assert!(!repo.is_sync_running("jellyfin").await.unwrap());
}
}