use std::{path::PathBuf, sync::Arc}; use futures_util::StreamExt; use libertas_core::plugins::PluginContext; use libertas_infra::factory::{ build_album_repository, build_database_pool, build_media_metadata_repository, build_media_repository, build_user_repository }; use serde::Deserialize; use tokio::fs; use uuid::Uuid; use crate::{config::load_config, plugin_manager::PluginManager}; pub mod config; pub mod plugin_manager; pub mod plugins; #[derive(Deserialize)] struct MediaJob { media_id: Uuid, } #[derive(Deserialize)] struct MediaDeletedJob { storage_path: String, } #[tokio::main] async fn main() -> anyhow::Result<()> { println!("Starting libertas worker..."); let config = load_config()?; let db_pool = build_database_pool(&config.database).await?; println!("Worker connected to database."); let media_repo = build_media_repository(&config, db_pool.clone()).await?; let album_repo = build_album_repository(&config.database, db_pool.clone()).await?; let user_repo = build_user_repository(&config.database, db_pool.clone()).await?; let metadata_repo = build_media_metadata_repository(&config.database, db_pool.clone()).await?; let context = Arc::new(PluginContext { media_repo, album_repo, user_repo, metadata_repo, media_library_path: config.media_library_path.clone(), config: Arc::new(config.clone()), }); println!("Plugin context created."); let plugin_manager = Arc::new(PluginManager::new()); let nats_client = async_nats::connect(&config.broker_url).await?; println!("Connected to NATS server at {}", config.broker_url); let mut sub_new = nats_client .queue_subscribe("media.new", "media_processors".to_string()) .await?; println!("Subscribed to 'media.new' queue"); let mut sub_deleted = nats_client .queue_subscribe("media.deleted", "media_deleters".to_string()) .await?; println!("Subscribed to 'media.deleted' queue"); loop { tokio::select! { Some(msg) = sub_new.next() => { let context = context.clone(); let manager = plugin_manager.clone(); tokio::spawn(async move { if let Err(e) = process_new_job(msg, context, manager).await { eprintln!("Job failed: {}", e); } }); }, // --- ADD THIS BLOCK --- Some(msg) = sub_deleted.next() => { let context = context.clone(); tokio::spawn(async move { if let Err(e) = process_deleted_job(msg, context).await { eprintln!("Deletion job failed: {}", e); } }); }, } } } async fn process_new_job( msg: async_nats::Message, context: Arc, plugin_manager: Arc, ) -> anyhow::Result<()> { let job: MediaJob = serde_json::from_slice(&msg.payload)?; let media = context .media_repo .find_by_id(job.media_id) .await? .ok_or_else(|| anyhow::anyhow!("Media not found: {}", job.media_id))?; println!("Processing media: {}", media.original_filename); plugin_manager.process_media(&media, &context).await; println!("Successfully processed job for media_id: {}", media.id); Ok(()) } async fn process_deleted_job( msg: async_nats::Message, context: Arc, ) -> anyhow::Result<()> { let payload: MediaDeletedJob = serde_json::from_slice(&msg.payload)?; let file_path = PathBuf::from(&context.media_library_path).join(&payload.storage_path); let xmp_path = format!("{}.xmp", file_path.to_string_lossy()); if let Err(e) = fs::remove_file(xmp_path).await { println!("Failed to delete XMP sidecar: {}", e); } Ok(()) }