From 64138b07e41aa71b6542ca0948e4648ea791e75d Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 20 Mar 2026 00:19:45 +0100 Subject: [PATCH] feat(infra): add FullSyncAdapter for library sync --- k-tv-backend/infra/src/lib.rs | 3 + k-tv-backend/infra/src/library_sync.rs | 247 +++++++++++++++++++++++++ 2 files changed, 250 insertions(+) create mode 100644 k-tv-backend/infra/src/library_sync.rs diff --git a/k-tv-backend/infra/src/lib.rs b/k-tv-backend/infra/src/lib.rs index bb3202f..714b86d 100644 --- a/k-tv-backend/infra/src/lib.rs +++ b/k-tv-backend/infra/src/lib.rs @@ -18,6 +18,9 @@ pub mod db; pub mod factory; pub mod jellyfin; pub mod provider_registry; +mod library_sync; +pub use library_sync::FullSyncAdapter; + mod app_settings_repository; mod activity_log_repository; mod channel_repository; diff --git a/k-tv-backend/infra/src/library_sync.rs b/k-tv-backend/infra/src/library_sync.rs new file mode 100644 index 0000000..cf2478a --- /dev/null +++ b/k-tv-backend/infra/src/library_sync.rs @@ -0,0 +1,247 @@ +//! Full-sync library sync adapter: truncate + re-insert all provider items. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Instant; + +use async_trait::async_trait; +use domain::{ + Collection, ILibraryRepository, IMediaProvider, LibraryItem, + LibrarySyncAdapter, LibrarySyncResult, MediaFilter, +}; + +pub struct FullSyncAdapter { + repo: Arc, +} + +impl FullSyncAdapter { + pub fn new(repo: Arc) -> Self { + Self { repo } + } +} + +#[async_trait] +impl LibrarySyncAdapter for FullSyncAdapter { + async fn sync_provider( + &self, + provider: &dyn IMediaProvider, + provider_id: &str, + ) -> LibrarySyncResult { + let start = Instant::now(); + + // Check for running sync first + match self.repo.is_sync_running(provider_id).await { + Ok(true) => { + return LibrarySyncResult { + provider_id: provider_id.to_string(), + items_found: 0, + duration_ms: 0, + error: Some("sync already running".to_string()), + }; + } + Err(e) => { + return LibrarySyncResult { + provider_id: provider_id.to_string(), + items_found: 0, + duration_ms: 0, + error: Some(e.to_string()), + }; + } + Ok(false) => {} + } + + let log_id = match self.repo.log_sync_start(provider_id).await { + Ok(id) => id, + Err(e) => { + return LibrarySyncResult { + provider_id: provider_id.to_string(), + items_found: 0, + duration_ms: start.elapsed().as_millis() as u64, + error: Some(e.to_string()), + }; + } + }; + + // Fetch collections for name/type enrichment — build a lookup map + let collections: Vec = provider.list_collections().await.unwrap_or_default(); + let collection_map: HashMap = + collections.iter().map(|c| (c.id.clone(), c)).collect(); + + // Fetch all items + let media_items = match provider.fetch_items(&MediaFilter::default()).await { + Ok(items) => items, + Err(e) => { + let result = LibrarySyncResult { + provider_id: provider_id.to_string(), + items_found: 0, + duration_ms: start.elapsed().as_millis() as u64, + error: Some(e.to_string()), + }; + let _ = self.repo.log_sync_finish(log_id, &result).await; + return result; + } + }; + + let items_found = media_items.len() as u32; + let now = chrono::Utc::now().to_rfc3339(); + + let library_items: Vec = media_items + .into_iter() + .map(|item| { + let raw_id = item.id.into_inner(); + let id = format!("{}::{}", provider_id, raw_id); + // Enrich with collection name/type using the lookup map. + let (col_name, col_type) = item.collection_id.as_deref() + .and_then(|cid| collection_map.get(cid)) + .map(|c| (Some(c.name.clone()), c.collection_type.clone())) + .unwrap_or((None, None)); + LibraryItem { + id, + provider_id: provider_id.to_string(), + external_id: raw_id, + title: item.title, + content_type: item.content_type, + duration_secs: item.duration_secs, + series_name: item.series_name, + season_number: item.season_number, + episode_number: item.episode_number, + year: item.year, + genres: item.genres, + tags: item.tags, + collection_id: item.collection_id, + collection_name: col_name, + collection_type: col_type, + thumbnail_url: item.thumbnail_url, + synced_at: now.clone(), + } + }) + .collect(); + + // Truncate + insert + if let Err(e) = self.repo.clear_provider(provider_id).await { + let result = LibrarySyncResult { + provider_id: provider_id.to_string(), + items_found: 0, + duration_ms: start.elapsed().as_millis() as u64, + error: Some(e.to_string()), + }; + let _ = self.repo.log_sync_finish(log_id, &result).await; + return result; + } + + let result = match self.repo.upsert_items(provider_id, library_items).await { + Ok(()) => LibrarySyncResult { + provider_id: provider_id.to_string(), + items_found, + duration_ms: start.elapsed().as_millis() as u64, + error: None, + }, + Err(e) => LibrarySyncResult { + provider_id: provider_id.to_string(), + items_found: 0, + duration_ms: start.elapsed().as_millis() as u64, + error: Some(e.to_string()), + }, + }; + + let _ = self.repo.log_sync_finish(log_id, &result).await; + result + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + use async_trait::async_trait; + use domain::*; + + struct MockProvider { + items: Vec, + } + + #[async_trait] + impl IMediaProvider for MockProvider { + fn capabilities(&self) -> ProviderCapabilities { + ProviderCapabilities { + collections: true, + series: false, + genres: false, + tags: false, + decade: false, + search: false, + streaming_protocol: StreamingProtocol::Hls, + rescan: false, + transcode: false, + } + } + async fn fetch_items(&self, _filter: &MediaFilter) -> DomainResult> { + Ok(self.items.clone()) + } + async fn fetch_by_id(&self, _id: &MediaItemId) -> DomainResult> { Ok(None) } + async fn get_stream_url(&self, _id: &MediaItemId, _q: &StreamQuality) -> DomainResult { Ok(String::new()) } + async fn list_collections(&self) -> DomainResult> { Ok(vec![]) } + async fn list_series(&self, _col: Option<&str>) -> DomainResult> { Ok(vec![]) } + async fn list_genres(&self, _ct: Option<&ContentType>) -> DomainResult> { Ok(vec![]) } + } + + struct SpyRepo { + upserted: Arc>>, + cleared: Arc>>, + } + + #[async_trait] + impl ILibraryRepository for SpyRepo { + async fn search(&self, _f: &LibrarySearchFilter) -> DomainResult<(Vec, u32)> { Ok((vec![], 0)) } + async fn get_by_id(&self, _id: &str) -> DomainResult> { Ok(None) } + async fn list_collections(&self, _p: Option<&str>) -> DomainResult> { Ok(vec![]) } + async fn list_series(&self, _p: Option<&str>) -> DomainResult> { Ok(vec![]) } + async fn list_genres(&self, _ct: Option<&ContentType>, _p: Option<&str>) -> DomainResult> { Ok(vec![]) } + async fn upsert_items(&self, _pid: &str, items: Vec) -> DomainResult<()> { + self.upserted.lock().unwrap().extend(items); + Ok(()) + } + async fn clear_provider(&self, pid: &str) -> DomainResult<()> { + self.cleared.lock().unwrap().push(pid.to_string()); + Ok(()) + } + async fn log_sync_start(&self, _pid: &str) -> DomainResult { Ok(1) } + async fn log_sync_finish(&self, _id: i64, _r: &LibrarySyncResult) -> DomainResult<()> { Ok(()) } + async fn latest_sync_status(&self) -> DomainResult> { Ok(vec![]) } + async fn is_sync_running(&self, _pid: &str) -> DomainResult { Ok(false) } + } + + #[tokio::test] + async fn sync_clears_then_upserts_items() { + let upserted = Arc::new(Mutex::new(vec![])); + let cleared = Arc::new(Mutex::new(vec![])); + let repo: Arc = Arc::new(SpyRepo { + upserted: Arc::clone(&upserted), + cleared: Arc::clone(&cleared), + }); + let adapter = FullSyncAdapter::new(Arc::clone(&repo)); + let provider = MockProvider { + items: vec![MediaItem { + id: MediaItemId::new("abc".to_string()), + title: "Test Movie".to_string(), + content_type: ContentType::Movie, + duration_secs: 3600, + description: None, + series_name: None, + season_number: None, + episode_number: None, + year: None, + genres: vec![], + tags: vec![], + thumbnail_url: None, + collection_id: None, + }], + }; + + let result = adapter.sync_provider(&provider, "jellyfin").await; + assert!(result.error.is_none()); + assert_eq!(result.items_found, 1); + assert_eq!(cleared.lock().unwrap().as_slice(), &["jellyfin"]); + assert_eq!(upserted.lock().unwrap().len(), 1); + } +}