feat(infra): add FullSyncAdapter for library sync
This commit is contained in:
247
k-tv-backend/infra/src/library_sync.rs
Normal file
247
k-tv-backend/infra/src/library_sync.rs
Normal file
@@ -0,0 +1,247 @@
|
||||
//! Full-sync library sync adapter: truncate + re-insert all provider items.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use domain::{
|
||||
Collection, ILibraryRepository, IMediaProvider, LibraryItem,
|
||||
LibrarySyncAdapter, LibrarySyncResult, MediaFilter,
|
||||
};
|
||||
|
||||
pub struct FullSyncAdapter {
|
||||
repo: Arc<dyn ILibraryRepository>,
|
||||
}
|
||||
|
||||
impl FullSyncAdapter {
|
||||
pub fn new(repo: Arc<dyn ILibraryRepository>) -> Self {
|
||||
Self { repo }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LibrarySyncAdapter for FullSyncAdapter {
|
||||
async fn sync_provider(
|
||||
&self,
|
||||
provider: &dyn IMediaProvider,
|
||||
provider_id: &str,
|
||||
) -> LibrarySyncResult {
|
||||
let start = Instant::now();
|
||||
|
||||
// Check for running sync first
|
||||
match self.repo.is_sync_running(provider_id).await {
|
||||
Ok(true) => {
|
||||
return LibrarySyncResult {
|
||||
provider_id: provider_id.to_string(),
|
||||
items_found: 0,
|
||||
duration_ms: 0,
|
||||
error: Some("sync already running".to_string()),
|
||||
};
|
||||
}
|
||||
Err(e) => {
|
||||
return LibrarySyncResult {
|
||||
provider_id: provider_id.to_string(),
|
||||
items_found: 0,
|
||||
duration_ms: 0,
|
||||
error: Some(e.to_string()),
|
||||
};
|
||||
}
|
||||
Ok(false) => {}
|
||||
}
|
||||
|
||||
let log_id = match self.repo.log_sync_start(provider_id).await {
|
||||
Ok(id) => id,
|
||||
Err(e) => {
|
||||
return LibrarySyncResult {
|
||||
provider_id: provider_id.to_string(),
|
||||
items_found: 0,
|
||||
duration_ms: start.elapsed().as_millis() as u64,
|
||||
error: Some(e.to_string()),
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
// Fetch collections for name/type enrichment — build a lookup map
|
||||
let collections: Vec<Collection> = provider.list_collections().await.unwrap_or_default();
|
||||
let collection_map: HashMap<String, &Collection> =
|
||||
collections.iter().map(|c| (c.id.clone(), c)).collect();
|
||||
|
||||
// Fetch all items
|
||||
let media_items = match provider.fetch_items(&MediaFilter::default()).await {
|
||||
Ok(items) => items,
|
||||
Err(e) => {
|
||||
let result = LibrarySyncResult {
|
||||
provider_id: provider_id.to_string(),
|
||||
items_found: 0,
|
||||
duration_ms: start.elapsed().as_millis() as u64,
|
||||
error: Some(e.to_string()),
|
||||
};
|
||||
let _ = self.repo.log_sync_finish(log_id, &result).await;
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
let items_found = media_items.len() as u32;
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
|
||||
let library_items: Vec<LibraryItem> = media_items
|
||||
.into_iter()
|
||||
.map(|item| {
|
||||
let raw_id = item.id.into_inner();
|
||||
let id = format!("{}::{}", provider_id, raw_id);
|
||||
// Enrich with collection name/type using the lookup map.
|
||||
let (col_name, col_type) = item.collection_id.as_deref()
|
||||
.and_then(|cid| collection_map.get(cid))
|
||||
.map(|c| (Some(c.name.clone()), c.collection_type.clone()))
|
||||
.unwrap_or((None, None));
|
||||
LibraryItem {
|
||||
id,
|
||||
provider_id: provider_id.to_string(),
|
||||
external_id: raw_id,
|
||||
title: item.title,
|
||||
content_type: item.content_type,
|
||||
duration_secs: item.duration_secs,
|
||||
series_name: item.series_name,
|
||||
season_number: item.season_number,
|
||||
episode_number: item.episode_number,
|
||||
year: item.year,
|
||||
genres: item.genres,
|
||||
tags: item.tags,
|
||||
collection_id: item.collection_id,
|
||||
collection_name: col_name,
|
||||
collection_type: col_type,
|
||||
thumbnail_url: item.thumbnail_url,
|
||||
synced_at: now.clone(),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Truncate + insert
|
||||
if let Err(e) = self.repo.clear_provider(provider_id).await {
|
||||
let result = LibrarySyncResult {
|
||||
provider_id: provider_id.to_string(),
|
||||
items_found: 0,
|
||||
duration_ms: start.elapsed().as_millis() as u64,
|
||||
error: Some(e.to_string()),
|
||||
};
|
||||
let _ = self.repo.log_sync_finish(log_id, &result).await;
|
||||
return result;
|
||||
}
|
||||
|
||||
let result = match self.repo.upsert_items(provider_id, library_items).await {
|
||||
Ok(()) => LibrarySyncResult {
|
||||
provider_id: provider_id.to_string(),
|
||||
items_found,
|
||||
duration_ms: start.elapsed().as_millis() as u64,
|
||||
error: None,
|
||||
},
|
||||
Err(e) => LibrarySyncResult {
|
||||
provider_id: provider_id.to_string(),
|
||||
items_found: 0,
|
||||
duration_ms: start.elapsed().as_millis() as u64,
|
||||
error: Some(e.to_string()),
|
||||
},
|
||||
};
|
||||
|
||||
let _ = self.repo.log_sync_finish(log_id, &result).await;
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use async_trait::async_trait;
|
||||
use domain::*;
|
||||
|
||||
struct MockProvider {
|
||||
items: Vec<MediaItem>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl IMediaProvider for MockProvider {
|
||||
fn capabilities(&self) -> ProviderCapabilities {
|
||||
ProviderCapabilities {
|
||||
collections: true,
|
||||
series: false,
|
||||
genres: false,
|
||||
tags: false,
|
||||
decade: false,
|
||||
search: false,
|
||||
streaming_protocol: StreamingProtocol::Hls,
|
||||
rescan: false,
|
||||
transcode: false,
|
||||
}
|
||||
}
|
||||
async fn fetch_items(&self, _filter: &MediaFilter) -> DomainResult<Vec<MediaItem>> {
|
||||
Ok(self.items.clone())
|
||||
}
|
||||
async fn fetch_by_id(&self, _id: &MediaItemId) -> DomainResult<Option<MediaItem>> { Ok(None) }
|
||||
async fn get_stream_url(&self, _id: &MediaItemId, _q: &StreamQuality) -> DomainResult<String> { Ok(String::new()) }
|
||||
async fn list_collections(&self) -> DomainResult<Vec<Collection>> { Ok(vec![]) }
|
||||
async fn list_series(&self, _col: Option<&str>) -> DomainResult<Vec<SeriesSummary>> { Ok(vec![]) }
|
||||
async fn list_genres(&self, _ct: Option<&ContentType>) -> DomainResult<Vec<String>> { Ok(vec![]) }
|
||||
}
|
||||
|
||||
struct SpyRepo {
|
||||
upserted: Arc<Mutex<Vec<LibraryItem>>>,
|
||||
cleared: Arc<Mutex<Vec<String>>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ILibraryRepository for SpyRepo {
|
||||
async fn search(&self, _f: &LibrarySearchFilter) -> DomainResult<(Vec<LibraryItem>, u32)> { Ok((vec![], 0)) }
|
||||
async fn get_by_id(&self, _id: &str) -> DomainResult<Option<LibraryItem>> { Ok(None) }
|
||||
async fn list_collections(&self, _p: Option<&str>) -> DomainResult<Vec<LibraryCollection>> { Ok(vec![]) }
|
||||
async fn list_series(&self, _p: Option<&str>) -> DomainResult<Vec<String>> { Ok(vec![]) }
|
||||
async fn list_genres(&self, _ct: Option<&ContentType>, _p: Option<&str>) -> DomainResult<Vec<String>> { Ok(vec![]) }
|
||||
async fn upsert_items(&self, _pid: &str, items: Vec<LibraryItem>) -> DomainResult<()> {
|
||||
self.upserted.lock().unwrap().extend(items);
|
||||
Ok(())
|
||||
}
|
||||
async fn clear_provider(&self, pid: &str) -> DomainResult<()> {
|
||||
self.cleared.lock().unwrap().push(pid.to_string());
|
||||
Ok(())
|
||||
}
|
||||
async fn log_sync_start(&self, _pid: &str) -> DomainResult<i64> { Ok(1) }
|
||||
async fn log_sync_finish(&self, _id: i64, _r: &LibrarySyncResult) -> DomainResult<()> { Ok(()) }
|
||||
async fn latest_sync_status(&self) -> DomainResult<Vec<LibrarySyncLogEntry>> { Ok(vec![]) }
|
||||
async fn is_sync_running(&self, _pid: &str) -> DomainResult<bool> { Ok(false) }
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sync_clears_then_upserts_items() {
|
||||
let upserted = Arc::new(Mutex::new(vec![]));
|
||||
let cleared = Arc::new(Mutex::new(vec![]));
|
||||
let repo: Arc<dyn ILibraryRepository> = Arc::new(SpyRepo {
|
||||
upserted: Arc::clone(&upserted),
|
||||
cleared: Arc::clone(&cleared),
|
||||
});
|
||||
let adapter = FullSyncAdapter::new(Arc::clone(&repo));
|
||||
let provider = MockProvider {
|
||||
items: vec![MediaItem {
|
||||
id: MediaItemId::new("abc".to_string()),
|
||||
title: "Test Movie".to_string(),
|
||||
content_type: ContentType::Movie,
|
||||
duration_secs: 3600,
|
||||
description: None,
|
||||
series_name: None,
|
||||
season_number: None,
|
||||
episode_number: None,
|
||||
year: None,
|
||||
genres: vec![],
|
||||
tags: vec![],
|
||||
thumbnail_url: None,
|
||||
collection_id: None,
|
||||
}],
|
||||
};
|
||||
|
||||
let result = adapter.sync_provider(&provider, "jellyfin").await;
|
||||
assert!(result.error.is_none());
|
||||
assert_eq!(result.items_found, 1);
|
||||
assert_eq!(cleared.lock().unwrap().as_slice(), &["jellyfin"]);
|
||||
assert_eq!(upserted.lock().unwrap().len(), 1);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user