feat: add XMP writer plugin and enhance media upload configuration

This commit is contained in:
2025-11-02 19:22:35 +01:00
parent 13bb9e6b3e
commit 8b98df745c
11 changed files with 217 additions and 31 deletions

View File

@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{path::PathBuf, sync::Arc};
use futures_util::StreamExt;
use libertas_core::plugins::PluginContext;
@@ -6,6 +6,7 @@ use libertas_infra::factory::{
build_album_repository, build_database_pool, build_media_repository, build_user_repository,
};
use serde::Deserialize;
use tokio::fs;
use uuid::Uuid;
use crate::{config::load_config, plugin_manager::PluginManager};
@@ -19,6 +20,11 @@ struct MediaJob {
media_id: Uuid,
}
#[derive(Deserialize)]
struct MediaDeletedJob {
storage_path: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Starting libertas worker...");
@@ -46,25 +52,42 @@ async fn main() -> anyhow::Result<()> {
println!("Connected to NATS server at {}", config.broker_url);
let mut subscriber = nats_client
let mut sub_new = nats_client
.queue_subscribe("media.new", "media_processors".to_string())
.await?;
println!("Subscribed to 'media.new' queue");
while let Some(msg) = subscriber.next().await {
let context = context.clone();
let manager = plugin_manager.clone();
tokio::spawn(async move {
if let Err(e) = process_job(msg, context, manager).await {
eprintln!("Job failed: {}", e);
}
});
}
let mut sub_deleted = nats_client
.queue_subscribe("media.deleted", "media_deleters".to_string())
.await?;
println!("Subscribed to 'media.deleted' queue");
Ok(())
loop {
tokio::select! {
Some(msg) = sub_new.next() => {
let context = context.clone();
let manager = plugin_manager.clone();
tokio::spawn(async move {
if let Err(e) = process_new_job(msg, context, manager).await {
eprintln!("Job failed: {}", e);
}
});
},
// --- ADD THIS BLOCK ---
Some(msg) = sub_deleted.next() => {
let context = context.clone();
tokio::spawn(async move {
if let Err(e) = process_deleted_job(msg, context).await {
eprintln!("Deletion job failed: {}", e);
}
});
},
}
}
}
async fn process_job(
async fn process_new_job(
msg: async_nats::Message,
context: Arc<PluginContext>,
plugin_manager: Arc<PluginManager>,
@@ -85,3 +108,17 @@ async fn process_job(
Ok(())
}
async fn process_deleted_job(
msg: async_nats::Message,
context: Arc<PluginContext>,
) -> anyhow::Result<()> {
let payload: MediaDeletedJob = serde_json::from_slice(&msg.payload)?;
let file_path = PathBuf::from(&context.media_library_path).join(&payload.storage_path);
let xmp_path = format!("{}.xmp", file_path.to_string_lossy());
if let Err(e) = fs::remove_file(xmp_path).await {
println!("Failed to delete XMP sidecar: {}", e);
}
Ok(())
}