388 lines
16 KiB
Rust
388 lines
16 KiB
Rust
//! SQLite implementation of ILibraryRepository.
|
|
|
|
use async_trait::async_trait;
|
|
use sqlx::SqlitePool;
|
|
|
|
use domain::{
|
|
ContentType, DomainError, DomainResult, ILibraryRepository,
|
|
LibraryCollection, LibraryItem, LibrarySearchFilter, LibrarySyncLogEntry, LibrarySyncResult,
|
|
};
|
|
|
|
pub struct SqliteLibraryRepository {
|
|
pool: SqlitePool,
|
|
}
|
|
|
|
impl SqliteLibraryRepository {
|
|
pub fn new(pool: SqlitePool) -> Self {
|
|
Self { pool }
|
|
}
|
|
}
|
|
|
|
fn content_type_str(ct: &ContentType) -> &'static str {
|
|
match ct {
|
|
ContentType::Movie => "movie",
|
|
ContentType::Episode => "episode",
|
|
ContentType::Short => "short",
|
|
}
|
|
}
|
|
|
|
fn parse_content_type(s: &str) -> ContentType {
|
|
match s {
|
|
"episode" => ContentType::Episode,
|
|
"short" => ContentType::Short,
|
|
_ => ContentType::Movie,
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ILibraryRepository for SqliteLibraryRepository {
|
|
async fn search(&self, filter: &LibrarySearchFilter) -> DomainResult<(Vec<LibraryItem>, u32)> {
|
|
let mut conditions: Vec<String> = vec![];
|
|
if let Some(ref p) = filter.provider_id {
|
|
conditions.push(format!("provider_id = '{}'", p.replace('\'', "''")));
|
|
}
|
|
if let Some(ref ct) = filter.content_type {
|
|
conditions.push(format!("content_type = '{}'", content_type_str(ct)));
|
|
}
|
|
if let Some(ref st) = filter.search_term {
|
|
conditions.push(format!("title LIKE '%{}%'", st.replace('\'', "''")));
|
|
}
|
|
if let Some(ref cid) = filter.collection_id {
|
|
conditions.push(format!("collection_id = '{}'", cid.replace('\'', "''")));
|
|
}
|
|
if let Some(decade) = filter.decade {
|
|
let end = decade + 10;
|
|
conditions.push(format!("year >= {} AND year < {}", decade, end));
|
|
}
|
|
if let Some(min) = filter.min_duration_secs {
|
|
conditions.push(format!("duration_secs >= {}", min));
|
|
}
|
|
if let Some(max) = filter.max_duration_secs {
|
|
conditions.push(format!("duration_secs <= {}", max));
|
|
}
|
|
if !filter.series_names.is_empty() {
|
|
let quoted: Vec<String> = filter.series_names.iter()
|
|
.map(|s| format!("'{}'", s.replace('\'', "''")))
|
|
.collect();
|
|
conditions.push(format!("series_name IN ({})", quoted.join(",")));
|
|
}
|
|
if !filter.genres.is_empty() {
|
|
let genre_conditions: Vec<String> = filter.genres.iter()
|
|
.map(|g| format!("EXISTS (SELECT 1 FROM json_each(library_items.genres) WHERE value = '{}')", g.replace('\'', "''")))
|
|
.collect();
|
|
conditions.push(format!("({})", genre_conditions.join(" OR ")));
|
|
}
|
|
|
|
let where_clause = if conditions.is_empty() {
|
|
String::new()
|
|
} else {
|
|
format!("WHERE {}", conditions.join(" AND "))
|
|
};
|
|
|
|
let count_sql = format!("SELECT COUNT(*) FROM library_items {}", where_clause);
|
|
let total: i64 = sqlx::query_scalar(&count_sql)
|
|
.fetch_one(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
|
|
|
let items_sql = format!(
|
|
"SELECT * FROM library_items {} ORDER BY title ASC LIMIT {} OFFSET {}",
|
|
where_clause, filter.limit, filter.offset
|
|
);
|
|
|
|
let rows = sqlx::query_as::<_, LibraryItemRow>(&items_sql)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
|
|
|
Ok((rows.into_iter().map(Into::into).collect(), total as u32))
|
|
}
|
|
|
|
async fn get_by_id(&self, id: &str) -> DomainResult<Option<LibraryItem>> {
|
|
let row = sqlx::query_as::<_, LibraryItemRow>(
|
|
"SELECT * FROM library_items WHERE id = ?"
|
|
)
|
|
.bind(id)
|
|
.fetch_optional(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
|
Ok(row.map(Into::into))
|
|
}
|
|
|
|
async fn list_collections(&self, provider_id: Option<&str>) -> DomainResult<Vec<LibraryCollection>> {
|
|
let rows: Vec<(String, Option<String>, Option<String>)> = if let Some(p) = provider_id {
|
|
sqlx::query_as::<_, (String, Option<String>, Option<String>)>(
|
|
"SELECT DISTINCT collection_id, collection_name, collection_type
|
|
FROM library_items WHERE collection_id IS NOT NULL AND provider_id = ?
|
|
ORDER BY collection_name ASC"
|
|
).bind(p).fetch_all(&self.pool).await
|
|
} else {
|
|
sqlx::query_as::<_, (String, Option<String>, Option<String>)>(
|
|
"SELECT DISTINCT collection_id, collection_name, collection_type
|
|
FROM library_items WHERE collection_id IS NOT NULL
|
|
ORDER BY collection_name ASC"
|
|
).fetch_all(&self.pool).await
|
|
}.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
|
|
|
Ok(rows.into_iter().map(|(id, name, ct)| LibraryCollection {
|
|
id,
|
|
name: name.unwrap_or_default(),
|
|
collection_type: ct,
|
|
}).collect())
|
|
}
|
|
|
|
async fn list_series(&self, provider_id: Option<&str>) -> DomainResult<Vec<String>> {
|
|
let rows: Vec<(String,)> = if let Some(p) = provider_id {
|
|
sqlx::query_as(
|
|
"SELECT DISTINCT series_name FROM library_items
|
|
WHERE series_name IS NOT NULL AND provider_id = ? ORDER BY series_name ASC"
|
|
).bind(p).fetch_all(&self.pool).await
|
|
} else {
|
|
sqlx::query_as(
|
|
"SELECT DISTINCT series_name FROM library_items
|
|
WHERE series_name IS NOT NULL ORDER BY series_name ASC"
|
|
).fetch_all(&self.pool).await
|
|
}.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
|
|
|
Ok(rows.into_iter().map(|(s,)| s).collect())
|
|
}
|
|
|
|
async fn list_genres(&self, content_type: Option<&ContentType>, provider_id: Option<&str>) -> DomainResult<Vec<String>> {
|
|
let sql = match (content_type, provider_id) {
|
|
(Some(ct), Some(p)) => format!(
|
|
"SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je
|
|
WHERE li.content_type = '{}' AND li.provider_id = '{}' ORDER BY je.value ASC",
|
|
content_type_str(ct), p.replace('\'', "''")
|
|
),
|
|
(Some(ct), None) => format!(
|
|
"SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je
|
|
WHERE li.content_type = '{}' ORDER BY je.value ASC",
|
|
content_type_str(ct)
|
|
),
|
|
(None, Some(p)) => format!(
|
|
"SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je
|
|
WHERE li.provider_id = '{}' ORDER BY je.value ASC",
|
|
p.replace('\'', "''")
|
|
),
|
|
(None, None) => "SELECT DISTINCT je.value FROM library_items li, json_each(li.genres) je ORDER BY je.value ASC".to_string(),
|
|
};
|
|
let rows: Vec<(String,)> = sqlx::query_as(&sql)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
|
Ok(rows.into_iter().map(|(s,)| s).collect())
|
|
}
|
|
|
|
async fn upsert_items(&self, _provider_id: &str, items: Vec<LibraryItem>) -> DomainResult<()> {
|
|
let mut tx = self.pool.begin().await.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
|
for item in items {
|
|
sqlx::query(
|
|
"INSERT OR REPLACE INTO library_items
|
|
(id, provider_id, external_id, title, content_type, duration_secs,
|
|
series_name, season_number, episode_number, year, genres, tags,
|
|
collection_id, collection_name, collection_type, thumbnail_url, synced_at)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
|
|
)
|
|
.bind(&item.id).bind(&item.provider_id).bind(&item.external_id)
|
|
.bind(&item.title).bind(content_type_str(&item.content_type))
|
|
.bind(item.duration_secs as i64)
|
|
.bind(&item.series_name).bind(item.season_number.map(|n| n as i64))
|
|
.bind(item.episode_number.map(|n| n as i64))
|
|
.bind(item.year.map(|n| n as i64))
|
|
.bind(serde_json::to_string(&item.genres).unwrap_or_default())
|
|
.bind(serde_json::to_string(&item.tags).unwrap_or_default())
|
|
.bind(&item.collection_id).bind(&item.collection_name)
|
|
.bind(&item.collection_type).bind(&item.thumbnail_url)
|
|
.bind(&item.synced_at)
|
|
.execute(&mut *tx)
|
|
.await
|
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
|
}
|
|
tx.commit().await.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
|
}
|
|
|
|
async fn clear_provider(&self, provider_id: &str) -> DomainResult<()> {
|
|
sqlx::query("DELETE FROM library_items WHERE provider_id = ?")
|
|
.bind(provider_id)
|
|
.execute(&self.pool)
|
|
.await
|
|
.map(|_| ())
|
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
|
}
|
|
|
|
async fn log_sync_start(&self, provider_id: &str) -> DomainResult<i64> {
|
|
let now = chrono::Utc::now().to_rfc3339();
|
|
let id = sqlx::query_scalar::<_, i64>(
|
|
"INSERT INTO library_sync_log (provider_id, started_at, status)
|
|
VALUES (?, ?, 'running') RETURNING id"
|
|
)
|
|
.bind(provider_id).bind(&now)
|
|
.fetch_one(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
|
Ok(id)
|
|
}
|
|
|
|
async fn log_sync_finish(&self, log_id: i64, result: &LibrarySyncResult) -> DomainResult<()> {
|
|
let now = chrono::Utc::now().to_rfc3339();
|
|
let status = if result.error.is_none() { "done" } else { "error" };
|
|
sqlx::query(
|
|
"UPDATE library_sync_log
|
|
SET finished_at = ?, items_found = ?, status = ?, error_msg = ?
|
|
WHERE id = ?"
|
|
)
|
|
.bind(&now).bind(result.items_found as i64)
|
|
.bind(status).bind(&result.error).bind(log_id)
|
|
.execute(&self.pool)
|
|
.await
|
|
.map(|_| ())
|
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
|
}
|
|
|
|
async fn latest_sync_status(&self) -> DomainResult<Vec<LibrarySyncLogEntry>> {
|
|
let rows = sqlx::query_as::<_, SyncLogRow>(
|
|
"SELECT * FROM library_sync_log
|
|
WHERE id IN (
|
|
SELECT MAX(id) FROM library_sync_log GROUP BY provider_id
|
|
)
|
|
ORDER BY started_at DESC"
|
|
)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
|
|
|
Ok(rows.into_iter().map(|r| LibrarySyncLogEntry {
|
|
id: r.id, provider_id: r.provider_id, started_at: r.started_at,
|
|
finished_at: r.finished_at, items_found: r.items_found as u32,
|
|
status: r.status, error_msg: r.error_msg,
|
|
}).collect())
|
|
}
|
|
|
|
async fn is_sync_running(&self, provider_id: &str) -> DomainResult<bool> {
|
|
let count: i64 = sqlx::query_scalar(
|
|
"SELECT COUNT(*) FROM library_sync_log WHERE provider_id = ? AND status = 'running'"
|
|
)
|
|
.bind(provider_id)
|
|
.fetch_one(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
|
Ok(count > 0)
|
|
}
|
|
}
|
|
|
|
// ── SQLx row types ─────────────────────────────────────────────────────────
|
|
|
|
#[derive(sqlx::FromRow)]
|
|
struct LibraryItemRow {
|
|
id: String, provider_id: String, external_id: String, title: String,
|
|
content_type: String, duration_secs: i64,
|
|
series_name: Option<String>, season_number: Option<i64>, episode_number: Option<i64>,
|
|
year: Option<i64>, genres: String, tags: String,
|
|
collection_id: Option<String>, collection_name: Option<String>, collection_type: Option<String>,
|
|
thumbnail_url: Option<String>, synced_at: String,
|
|
}
|
|
|
|
impl From<LibraryItemRow> for LibraryItem {
|
|
fn from(r: LibraryItemRow) -> Self {
|
|
Self {
|
|
id: r.id, provider_id: r.provider_id, external_id: r.external_id,
|
|
title: r.title, content_type: parse_content_type(&r.content_type),
|
|
duration_secs: r.duration_secs as u32,
|
|
series_name: r.series_name,
|
|
season_number: r.season_number.map(|n| n as u32),
|
|
episode_number: r.episode_number.map(|n| n as u32),
|
|
year: r.year.map(|n| n as u16),
|
|
genres: serde_json::from_str(&r.genres).unwrap_or_default(),
|
|
tags: serde_json::from_str(&r.tags).unwrap_or_default(),
|
|
collection_id: r.collection_id, collection_name: r.collection_name,
|
|
collection_type: r.collection_type, thumbnail_url: r.thumbnail_url,
|
|
synced_at: r.synced_at,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(sqlx::FromRow)]
|
|
struct SyncLogRow {
|
|
id: i64, provider_id: String, started_at: String, finished_at: Option<String>,
|
|
items_found: i64, status: String, error_msg: Option<String>,
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use sqlx::SqlitePool;
|
|
use domain::{LibraryItem, LibrarySearchFilter, ContentType};
|
|
|
|
async fn setup() -> SqlitePool {
|
|
let pool = SqlitePool::connect(":memory:").await.unwrap();
|
|
sqlx::query(
|
|
"CREATE TABLE library_items (
|
|
id TEXT PRIMARY KEY, provider_id TEXT NOT NULL, external_id TEXT NOT NULL,
|
|
title TEXT NOT NULL, content_type TEXT NOT NULL, duration_secs INTEGER NOT NULL DEFAULT 0,
|
|
series_name TEXT, season_number INTEGER, episode_number INTEGER, year INTEGER,
|
|
genres TEXT NOT NULL DEFAULT '[]', tags TEXT NOT NULL DEFAULT '[]',
|
|
collection_id TEXT, collection_name TEXT, collection_type TEXT,
|
|
thumbnail_url TEXT, synced_at TEXT NOT NULL
|
|
)"
|
|
).execute(&pool).await.unwrap();
|
|
sqlx::query(
|
|
"CREATE TABLE library_sync_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, provider_id TEXT NOT NULL,
|
|
started_at TEXT NOT NULL, finished_at TEXT, items_found INTEGER NOT NULL DEFAULT 0,
|
|
status TEXT NOT NULL DEFAULT 'running', error_msg TEXT
|
|
)"
|
|
).execute(&pool).await.unwrap();
|
|
pool
|
|
}
|
|
|
|
fn make_item(id: &str, provider: &str, title: &str) -> LibraryItem {
|
|
LibraryItem {
|
|
id: id.to_string(), provider_id: provider.to_string(), external_id: id.to_string(),
|
|
title: title.to_string(), content_type: ContentType::Movie,
|
|
duration_secs: 3600, series_name: None, season_number: None, episode_number: None,
|
|
year: Some(2020), genres: vec!["Action".to_string()], tags: vec![],
|
|
collection_id: None, collection_name: None, collection_type: None,
|
|
thumbnail_url: None, synced_at: "2026-03-19T00:00:00Z".to_string(),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn upsert_then_search_returns_items() {
|
|
let pool = setup().await;
|
|
let repo = SqliteLibraryRepository::new(pool);
|
|
let items = vec![make_item("jellyfin::1", "jellyfin", "Movie A")];
|
|
repo.upsert_items("jellyfin", items).await.unwrap();
|
|
|
|
let (results, total) = repo.search(&LibrarySearchFilter { limit: 50, ..Default::default() }).await.unwrap();
|
|
assert_eq!(total, 1);
|
|
assert_eq!(results[0].title, "Movie A");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn clear_provider_removes_only_that_provider() {
|
|
let pool = setup().await;
|
|
let repo = SqliteLibraryRepository::new(pool);
|
|
repo.upsert_items("jellyfin", vec![make_item("jellyfin::1", "jellyfin", "Jelly Movie")]).await.unwrap();
|
|
repo.upsert_items("local", vec![make_item("local::1", "local", "Local Movie")]).await.unwrap();
|
|
repo.clear_provider("jellyfin").await.unwrap();
|
|
|
|
let (results, _) = repo.search(&LibrarySearchFilter { limit: 50, ..Default::default() }).await.unwrap();
|
|
assert_eq!(results.len(), 1);
|
|
assert_eq!(results[0].provider_id, "local");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn is_sync_running_reflects_status() {
|
|
let pool = setup().await;
|
|
let repo = SqliteLibraryRepository::new(pool);
|
|
assert!(!repo.is_sync_running("jellyfin").await.unwrap());
|
|
let log_id = repo.log_sync_start("jellyfin").await.unwrap();
|
|
assert!(repo.is_sync_running("jellyfin").await.unwrap());
|
|
let result = domain::LibrarySyncResult {
|
|
provider_id: "jellyfin".to_string(), items_found: 5, duration_ms: 100, error: None,
|
|
};
|
|
repo.log_sync_finish(log_id, &result).await.unwrap();
|
|
assert!(!repo.is_sync_running("jellyfin").await.unwrap());
|
|
}
|
|
}
|