diff --git a/Cargo.lock b/Cargo.lock index 6d6cc44..d05d3ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -270,6 +270,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37521ac7aabe3d13122dc382493e20c9416f299d2ccd5b3a5340a2570cdeb0f3" dependencies = [ "find-msvc-tools", + "jobserver", + "libc", "shlex", ] @@ -628,6 +630,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "futures" version = "0.3.31" @@ -1114,6 +1122,16 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jobserver" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" +dependencies = [ + "getrandom 0.3.4", + "libc", +] + [[package]] name = "js-sys" version = "0.3.82" @@ -1235,6 +1253,7 @@ dependencies = [ "sqlx", "tokio", "uuid", + "xmp_toolkit", ] [[package]] @@ -1489,6 +1508,28 @@ dependencies = [ "libm", ] +[[package]] +name = "num_enum" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1207a7e20ad57b847bbddc6776b968420d38292bbfe2089accff5e19e82454c" +dependencies = [ + "num_enum_derive", + "rustversion", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff32365de1b6743cb203b710788263c44a03de03802daf96092f2da4fe6ba4d7" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -1688,6 +1729,15 @@ dependencies = [ "elliptic-curve", ] +[[package]] +name = "proc-macro-crate" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983" +dependencies = [ + "toml_edit", +] + [[package]] name = "proc-macro2" version = "1.0.103" @@ -2623,6 +2673,36 @@ dependencies = [ "webpki-roots 0.26.11", ] +[[package]] +name = "toml_datetime" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533" +dependencies = [ + "serde_core", +] + +[[package]] +name = "toml_edit" +version = "0.23.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d" +dependencies = [ + "indexmap", + "toml_datetime", + "toml_parser", + "winnow", +] + +[[package]] +name = "toml_parser" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e" +dependencies = [ + "winnow", +] + [[package]] name = "tower" version = "0.5.2" @@ -3217,6 +3297,15 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "winnow" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21a0236b59786fed61e2a80582dd500fe61f18b5dca67a4a067d0bc9039339cf" +dependencies = [ + "memchr", +] + [[package]] name = "wit-bindgen" version = "0.46.0" @@ -3229,6 +3318,18 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +[[package]] +name = "xmp_toolkit" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ee890a0f3c8c4ef2a6fbd098652642e1fc9ea4c622763169d64dbe7b3beaf7" +dependencies = [ + "cc", + "fs_extra", + "num_enum", + "thiserror 2.0.17", +] + [[package]] name = "yoke" version = "0.8.1" diff --git a/libertas_api/src/config.rs b/libertas_api/src/config.rs index b57de04..a239a55 100644 --- a/libertas_api/src/config.rs +++ b/libertas_api/src/config.rs @@ -13,5 +13,6 @@ pub fn load_config() -> CoreResult { jwt_secret: "super_secret_jwt_key".to_string(), media_library_path: "media_library".to_string(), broker_url: "nats://localhost:4222".to_string(), + max_upload_size_mb: Some(100), }) } diff --git a/libertas_api/src/handlers/media_handlers.rs b/libertas_api/src/handlers/media_handlers.rs index 13d8adc..4469e16 100644 --- a/libertas_api/src/handlers/media_handlers.rs +++ b/libertas_api/src/handlers/media_handlers.rs @@ -38,11 +38,13 @@ impl From for MediaResponse { } pub fn media_routes() -> Router { + let max_size_mb = 100; // todo: get from config + Router::new() .route("/", post(upload_media)) .route("/{id}", get(get_media_details).delete(delete_media)) .route("/{id}/file", get(get_media_file)) - .layer(DefaultBodyLimit::max(250 * 1024 * 1024)) + .layer(DefaultBodyLimit::max(max_size_mb * 1024 * 1024)) } async fn upload_media( diff --git a/libertas_api/src/services/media_service.rs b/libertas_api/src/services/media_service.rs index 2628cde..57beef5 100644 --- a/libertas_api/src/services/media_service.rs +++ b/libertas_api/src/services/media_service.rs @@ -226,7 +226,7 @@ impl MediaService for MediaServiceImpl { .update_storage_used(user.id, -file_size) .await?; - let job_payload = json!({ "media_id": id }); + let job_payload = json!({ "storage_path": media.storage_path }); self.nats_client .publish("media.deleted".to_string(), job_payload.to_string().into()) .await diff --git a/libertas_core/src/config.rs b/libertas_core/src/config.rs index ad80543..511417d 100644 --- a/libertas_core/src/config.rs +++ b/libertas_core/src/config.rs @@ -1,7 +1,5 @@ use serde::Deserialize; -use crate::error::CoreResult; - #[derive(Deserialize, Clone)] pub enum DatabaseType { Postgres, @@ -21,17 +19,5 @@ pub struct Config { pub jwt_secret: String, pub media_library_path: String, pub broker_url: String, -} - -pub fn load_config() -> CoreResult { - Ok(Config { - database: DatabaseConfig { - db_type: DatabaseType::Postgres, - url: "postgres://postgres:postgres@localhost:5432/libertas_db".to_string(), - }, - server_address: "127.0.0.1:8080".to_string(), - jwt_secret: "super_secret_jwt_key".to_string(), - media_library_path: "media_library".to_string(), - broker_url: "amqp://guest:guest@localhost:5672/".to_string(), - }) + pub max_upload_size_mb: Option, } diff --git a/libertas_worker/Cargo.toml b/libertas_worker/Cargo.toml index ee62a1b..ced273a 100644 --- a/libertas_worker/Cargo.toml +++ b/libertas_worker/Cargo.toml @@ -24,3 +24,4 @@ bytes = "1.10.1" uuid = { version = "1.18.1", features = ["v4", "serde"] } nom-exif = { version = "2.5.4", features = ["serde", "tokio", "async"] } async-trait = "0.1.89" +xmp_toolkit = "1.11.0" diff --git a/libertas_worker/src/config.rs b/libertas_worker/src/config.rs index b57de04..a239a55 100644 --- a/libertas_worker/src/config.rs +++ b/libertas_worker/src/config.rs @@ -13,5 +13,6 @@ pub fn load_config() -> CoreResult { jwt_secret: "super_secret_jwt_key".to_string(), media_library_path: "media_library".to_string(), broker_url: "nats://localhost:4222".to_string(), + max_upload_size_mb: Some(100), }) } diff --git a/libertas_worker/src/main.rs b/libertas_worker/src/main.rs index c68b471..1b3f2bf 100644 --- a/libertas_worker/src/main.rs +++ b/libertas_worker/src/main.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{path::PathBuf, sync::Arc}; use futures_util::StreamExt; use libertas_core::plugins::PluginContext; @@ -6,6 +6,7 @@ use libertas_infra::factory::{ build_album_repository, build_database_pool, build_media_repository, build_user_repository, }; use serde::Deserialize; +use tokio::fs; use uuid::Uuid; use crate::{config::load_config, plugin_manager::PluginManager}; @@ -19,6 +20,11 @@ struct MediaJob { media_id: Uuid, } +#[derive(Deserialize)] +struct MediaDeletedJob { + storage_path: String, +} + #[tokio::main] async fn main() -> anyhow::Result<()> { println!("Starting libertas worker..."); @@ -46,25 +52,42 @@ async fn main() -> anyhow::Result<()> { println!("Connected to NATS server at {}", config.broker_url); - let mut subscriber = nats_client + let mut sub_new = nats_client .queue_subscribe("media.new", "media_processors".to_string()) .await?; println!("Subscribed to 'media.new' queue"); - while let Some(msg) = subscriber.next().await { - let context = context.clone(); - let manager = plugin_manager.clone(); - tokio::spawn(async move { - if let Err(e) = process_job(msg, context, manager).await { - eprintln!("Job failed: {}", e); - } - }); - } + let mut sub_deleted = nats_client + .queue_subscribe("media.deleted", "media_deleters".to_string()) + .await?; + println!("Subscribed to 'media.deleted' queue"); - Ok(()) + loop { + tokio::select! { + Some(msg) = sub_new.next() => { + let context = context.clone(); + let manager = plugin_manager.clone(); + tokio::spawn(async move { + if let Err(e) = process_new_job(msg, context, manager).await { + eprintln!("Job failed: {}", e); + } + }); + }, + + // --- ADD THIS BLOCK --- + Some(msg) = sub_deleted.next() => { + let context = context.clone(); + tokio::spawn(async move { + if let Err(e) = process_deleted_job(msg, context).await { + eprintln!("Deletion job failed: {}", e); + } + }); + }, + } + } } -async fn process_job( +async fn process_new_job( msg: async_nats::Message, context: Arc, plugin_manager: Arc, @@ -85,3 +108,17 @@ async fn process_job( Ok(()) } + +async fn process_deleted_job( + msg: async_nats::Message, + context: Arc, +) -> anyhow::Result<()> { + let payload: MediaDeletedJob = serde_json::from_slice(&msg.payload)?; + let file_path = PathBuf::from(&context.media_library_path).join(&payload.storage_path); + let xmp_path = format!("{}.xmp", file_path.to_string_lossy()); + if let Err(e) = fs::remove_file(xmp_path).await { + println!("Failed to delete XMP sidecar: {}", e); + } + + Ok(()) +} diff --git a/libertas_worker/src/plugin_manager.rs b/libertas_worker/src/plugin_manager.rs index d31d568..3446ab8 100644 --- a/libertas_worker/src/plugin_manager.rs +++ b/libertas_worker/src/plugin_manager.rs @@ -5,7 +5,7 @@ use libertas_core::{ plugins::{MediaProcessorPlugin, PluginContext}, }; -use crate::plugins::exif_reader::ExifReaderPlugin; +use crate::plugins::{exif_reader::ExifReaderPlugin, xmp_writer::XmpWriterPlugin}; pub struct PluginManager { plugins: Vec>, @@ -17,6 +17,8 @@ impl PluginManager { plugins.push(Arc::new(ExifReaderPlugin)); + plugins.push(Arc::new(XmpWriterPlugin)); + println!("PluginManager loaded {} plugins", plugins.len()); Self { plugins } } diff --git a/libertas_worker/src/plugins/mod.rs b/libertas_worker/src/plugins/mod.rs index 94e193b..9f10c23 100644 --- a/libertas_worker/src/plugins/mod.rs +++ b/libertas_worker/src/plugins/mod.rs @@ -1 +1,2 @@ pub mod exif_reader; +pub mod xmp_writer; diff --git a/libertas_worker/src/plugins/xmp_writer.rs b/libertas_worker/src/plugins/xmp_writer.rs new file mode 100644 index 0000000..c90a8aa --- /dev/null +++ b/libertas_worker/src/plugins/xmp_writer.rs @@ -0,0 +1,54 @@ +use std::path::PathBuf; + +use async_trait::async_trait; +use libertas_core::{ + error::{CoreError, CoreResult}, + models::Media, + plugins::{MediaProcessorPlugin, PluginContext, PluginData}, +}; +use tokio::fs; +use xmp_toolkit::XmpMeta; + +pub struct XmpWriterPlugin; + +#[async_trait] +impl MediaProcessorPlugin for XmpWriterPlugin { + fn name(&self) -> &'static str { + "xmp_writer" + } + + async fn process(&self, media: &Media, context: &PluginContext) -> CoreResult { + let fresh_media = context + .media_repo + .find_by_id(media.id) + .await? + .ok_or(CoreError::NotFound("Media".to_string(), media.id))?; + + let file_path = PathBuf::from(&context.media_library_path).join(&fresh_media.storage_path); + let xmp_path = format!("{}.xmp", file_path.to_string_lossy()); + + let mut xmp = XmpMeta::new() + .map_err(|e| CoreError::Unknown(format!("Failed to create new XMP metadata: {}", e)))?; + + xmp.set_property( + "http://purl.org/dc/elements/1.1/", + "description", + &fresh_media.original_filename.into(), + ) + .map_err(|e| { + CoreError::Unknown(format!("Failed to set description property in XMP: {}", e)) + })?; + + if let Some(_location) = &fresh_media.extracted_location { + // TODO: Set location properties in XMP + } + + let xmp_str = xmp.to_string(); + + fs::write(&xmp_path, xmp_str).await?; + + Ok(PluginData { + message: "XMP sidecar written successfully.".to_string(), + }) + } +}