//! Full-sync library sync adapter: truncate + re-insert all provider items. use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; use async_trait::async_trait; use domain::{ Collection, ILibraryRepository, IMediaProvider, LibraryItem, LibrarySyncAdapter, LibrarySyncResult, MediaFilter, }; pub struct FullSyncAdapter { repo: Arc, } impl FullSyncAdapter { pub fn new(repo: Arc) -> Self { Self { repo } } } #[async_trait] impl LibrarySyncAdapter for FullSyncAdapter { async fn sync_provider( &self, provider: &dyn IMediaProvider, provider_id: &str, ) -> LibrarySyncResult { let start = Instant::now(); // Check for running sync first match self.repo.is_sync_running(provider_id).await { Ok(true) => { return LibrarySyncResult { provider_id: provider_id.to_string(), items_found: 0, duration_ms: 0, error: Some("sync already running".to_string()), }; } Err(e) => { return LibrarySyncResult { provider_id: provider_id.to_string(), items_found: 0, duration_ms: 0, error: Some(e.to_string()), }; } Ok(false) => {} } let log_id = match self.repo.log_sync_start(provider_id).await { Ok(id) => id, Err(e) => { return LibrarySyncResult { provider_id: provider_id.to_string(), items_found: 0, duration_ms: start.elapsed().as_millis() as u64, error: Some(e.to_string()), }; } }; // Fetch collections for name/type enrichment — build a lookup map let collections: Vec = provider.list_collections().await.unwrap_or_default(); let collection_map: HashMap = collections.iter().map(|c| (c.id.clone(), c)).collect(); // Fetch all items let media_items = match provider.fetch_items(&MediaFilter::default()).await { Ok(items) => items, Err(e) => { let result = LibrarySyncResult { provider_id: provider_id.to_string(), items_found: 0, duration_ms: start.elapsed().as_millis() as u64, error: Some(e.to_string()), }; let _ = self.repo.log_sync_finish(log_id, &result).await; return result; } }; let items_found = media_items.len() as u32; let now = chrono::Utc::now().to_rfc3339(); let library_items: Vec = media_items .into_iter() .map(|item| { let raw_id = item.id.into_inner(); let id = format!("{}::{}", provider_id, raw_id); // Enrich with collection name/type using the lookup map. let (col_name, col_type) = item.collection_id.as_deref() .and_then(|cid| collection_map.get(cid)) .map(|c| (Some(c.name.clone()), c.collection_type.clone())) .unwrap_or((None, None)); LibraryItem { id, provider_id: provider_id.to_string(), external_id: raw_id, title: item.title, content_type: item.content_type, duration_secs: item.duration_secs, series_name: item.series_name, season_number: item.season_number, episode_number: item.episode_number, year: item.year, genres: item.genres, tags: item.tags, collection_id: item.collection_id, collection_name: col_name, collection_type: col_type, thumbnail_url: item.thumbnail_url, synced_at: now.clone(), } }) .collect(); // Truncate + insert if let Err(e) = self.repo.clear_provider(provider_id).await { let result = LibrarySyncResult { provider_id: provider_id.to_string(), items_found: 0, duration_ms: start.elapsed().as_millis() as u64, error: Some(e.to_string()), }; let _ = self.repo.log_sync_finish(log_id, &result).await; return result; } let result = match self.repo.upsert_items(provider_id, library_items).await { Ok(()) => LibrarySyncResult { provider_id: provider_id.to_string(), items_found, duration_ms: start.elapsed().as_millis() as u64, error: None, }, Err(e) => LibrarySyncResult { provider_id: provider_id.to_string(), items_found: 0, duration_ms: start.elapsed().as_millis() as u64, error: Some(e.to_string()), }, }; let _ = self.repo.log_sync_finish(log_id, &result).await; result } } #[cfg(test)] mod tests { use super::*; use std::sync::{Arc, Mutex}; use async_trait::async_trait; use domain::*; struct MockProvider { items: Vec, } #[async_trait] impl IMediaProvider for MockProvider { fn capabilities(&self) -> ProviderCapabilities { ProviderCapabilities { collections: true, series: false, genres: false, tags: false, decade: false, search: false, streaming_protocol: StreamingProtocol::Hls, rescan: false, transcode: false, } } async fn fetch_items(&self, _filter: &MediaFilter) -> DomainResult> { Ok(self.items.clone()) } async fn fetch_by_id(&self, _id: &MediaItemId) -> DomainResult> { Ok(None) } async fn get_stream_url(&self, _id: &MediaItemId, _q: &StreamQuality) -> DomainResult { Ok(String::new()) } async fn list_collections(&self) -> DomainResult> { Ok(vec![]) } async fn list_series(&self, _col: Option<&str>) -> DomainResult> { Ok(vec![]) } async fn list_genres(&self, _ct: Option<&ContentType>) -> DomainResult> { Ok(vec![]) } } struct SpyRepo { upserted: Arc>>, cleared: Arc>>, } #[async_trait] impl ILibraryRepository for SpyRepo { async fn search(&self, _f: &LibrarySearchFilter) -> DomainResult<(Vec, u32)> { Ok((vec![], 0)) } async fn get_by_id(&self, _id: &str) -> DomainResult> { Ok(None) } async fn list_collections(&self, _p: Option<&str>) -> DomainResult> { Ok(vec![]) } async fn list_series(&self, _p: Option<&str>) -> DomainResult> { Ok(vec![]) } async fn list_genres(&self, _ct: Option<&ContentType>, _p: Option<&str>) -> DomainResult> { Ok(vec![]) } async fn upsert_items(&self, _pid: &str, items: Vec) -> DomainResult<()> { self.upserted.lock().unwrap().extend(items); Ok(()) } async fn clear_provider(&self, pid: &str) -> DomainResult<()> { self.cleared.lock().unwrap().push(pid.to_string()); Ok(()) } async fn log_sync_start(&self, _pid: &str) -> DomainResult { Ok(1) } async fn log_sync_finish(&self, _id: i64, _r: &LibrarySyncResult) -> DomainResult<()> { Ok(()) } async fn latest_sync_status(&self) -> DomainResult> { Ok(vec![]) } async fn is_sync_running(&self, _pid: &str) -> DomainResult { Ok(false) } } #[tokio::test] async fn sync_clears_then_upserts_items() { let upserted = Arc::new(Mutex::new(vec![])); let cleared = Arc::new(Mutex::new(vec![])); let repo: Arc = Arc::new(SpyRepo { upserted: Arc::clone(&upserted), cleared: Arc::clone(&cleared), }); let adapter = FullSyncAdapter::new(Arc::clone(&repo)); let provider = MockProvider { items: vec![MediaItem { id: MediaItemId::new("abc".to_string()), title: "Test Movie".to_string(), content_type: ContentType::Movie, duration_secs: 3600, description: None, series_name: None, season_number: None, episode_number: None, year: None, genres: vec![], tags: vec![], thumbnail_url: None, collection_id: None, }], }; let result = adapter.sync_provider(&provider, "jellyfin").await; assert!(result.error.is_none()); assert_eq!(result.items_found, 1); assert_eq!(cleared.lock().unwrap().as_slice(), &["jellyfin"]); assert_eq!(upserted.lock().unwrap().len(), 1); } }