diff --git a/Cargo.lock b/Cargo.lock index 3221aba..73243dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -32,6 +32,15 @@ version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +[[package]] +name = "approx" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cab112f0a86d568ea0e627cc1d6be74a1e9cd55214684db5561995f6dad897c6" +dependencies = [ + "num-traits", +] + [[package]] name = "argon2" version = "0.5.3" @@ -730,6 +739,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "geo-types" +version = "0.7.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75a4dcd69d35b2c87a7c83bce9af69fd65c9d68d3833a0ded568983928f3fc99" +dependencies = [ + "approx", + "num-traits", + "serde", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -1072,6 +1092,16 @@ dependencies = [ "hashbrown 0.16.0", ] +[[package]] +name = "iso6709parse" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5090db9c6a716d1f4eeb729957e889e9c28156061c825cbccd44950cf0f3c66" +dependencies = [ + "geo-types", + "nom", +] + [[package]] name = "itoa" version = "1.0.15" @@ -1184,10 +1214,12 @@ version = "0.1.0" dependencies = [ "anyhow", "async-nats", + "async-trait", "bytes", "futures-util", "libertas_core", "libertas_infra", + "nom-exif", "serde", "serde_json", "sqlx", @@ -1272,6 +1304,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "mio" version = "1.1.0" @@ -1315,6 +1353,33 @@ dependencies = [ "signatory", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] +name = "nom-exif" +version = "2.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a6703c263bdeb67ea61c7a7605ebfd42996c49cbf8558724b88fd67804f35d" +dependencies = [ + "bytes", + "chrono", + "iso6709parse", + "nom", + "regex", + "serde", + "thiserror 2.0.17", + "tokio", + "tracing", +] + [[package]] name = "nuid" version = "0.5.0" diff --git a/libertas_api/src/main.rs b/libertas_api/src/main.rs index 2924f0c..f8e257d 100644 --- a/libertas_api/src/main.rs +++ b/libertas_api/src/main.rs @@ -5,7 +5,6 @@ pub mod error; pub mod factory; pub mod handlers; pub mod middleware; -pub mod repositories; pub mod routes; pub mod security; pub mod services; diff --git a/libertas_api/src/repositories/album_repository.rs b/libertas_api/src/repositories/album_repository.rs deleted file mode 100644 index 46fd502..0000000 --- a/libertas_api/src/repositories/album_repository.rs +++ /dev/null @@ -1,91 +0,0 @@ -use async_trait::async_trait; -use libertas_core::{ - error::{CoreError, CoreResult}, - models::Album, - repositories::AlbumRepository, -}; -use sqlx::PgPool; -use uuid::Uuid; - -#[derive(Clone)] -pub struct PostgresAlbumRepository { - pool: PgPool, -} - -impl PostgresAlbumRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl AlbumRepository for PostgresAlbumRepository { - async fn create(&self, album: Album) -> CoreResult<()> { - sqlx::query!( - r#" - INSERT INTO albums (id, owner_id, name, description, is_public, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7) - "#, - album.id, - album.owner_id, - album.name, - album.description, - album.is_public, - album.created_at, - album.updated_at - ) - .execute(&self.pool) - .await - .map_err(|e| CoreError::Database(e.to_string()))?; - - Ok(()) - } - - async fn find_by_id(&self, id: Uuid) -> CoreResult> { - sqlx::query_as!( - Album, - r#" - SELECT id, owner_id, name, description, is_public, created_at, updated_at - FROM albums - WHERE id = $1 - "#, - id - ) - .fetch_optional(&self.pool) - .await - .map_err(|e| CoreError::Database(e.to_string())) - } - - async fn list_by_user(&self, user_id: Uuid) -> CoreResult> { - sqlx::query_as!( - Album, - r#" - SELECT id, owner_id, name, description, is_public, created_at, updated_at - FROM albums - WHERE owner_id = $1 - "#, - user_id - ) - .fetch_all(&self.pool) - .await - .map_err(|e| CoreError::Database(e.to_string())) - } - - async fn add_media_to_album(&self, album_id: Uuid, media_ids: &[Uuid]) -> CoreResult<()> { - // Use sqlx's `unnest` feature to pass the Vec efficiently - sqlx::query!( - r#" - INSERT INTO album_media (album_id, media_id) - SELECT $1, media_id FROM unnest($2::uuid[]) as media_id - ON CONFLICT (album_id, media_id) DO NOTHING - "#, - album_id, - media_ids - ) - .execute(&self.pool) - .await - .map_err(|e| CoreError::Database(e.to_string()))?; - - Ok(()) - } -} diff --git a/libertas_api/src/repositories/media_repository.rs b/libertas_api/src/repositories/media_repository.rs deleted file mode 100644 index 7983b73..0000000 --- a/libertas_api/src/repositories/media_repository.rs +++ /dev/null @@ -1,93 +0,0 @@ -use async_trait::async_trait; -use libertas_core::{ - error::{CoreError, CoreResult}, - models::Media, - repositories::MediaRepository, -}; -use sqlx::PgPool; -use uuid::Uuid; - -#[derive(Clone)] -pub struct PostgresMediaRepository { - pool: PgPool, -} - -impl PostgresMediaRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl MediaRepository for PostgresMediaRepository { - async fn create(&self, media: &Media) -> CoreResult<()> { - sqlx::query!( - r#" - INSERT INTO media (id, owner_id, storage_path, original_filename, mime_type, hash, created_at, width, height) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - "#, - media.id, - media.owner_id, - media.storage_path, - media.original_filename, - media.mime_type, - media.hash, - media.created_at, - media.width, - media.height - ) - .execute(&self.pool) - .await - .map_err(|e| CoreError::Database(e.to_string()))?; - - Ok(()) - } - - async fn find_by_hash(&self, hash: &str) -> CoreResult> { - sqlx::query_as!( - Media, - r#" - SELECT id, owner_id, storage_path, original_filename, mime_type, hash, created_at, - extracted_location, width, height - FROM media - WHERE hash = $1 - "#, - hash - ) - .fetch_optional(&self.pool) - .await - .map_err(|e| CoreError::Database(e.to_string())) - } - - async fn find_by_id(&self, id: Uuid) -> CoreResult> { - sqlx::query_as!( - Media, - r#" - SELECT id, owner_id, storage_path, original_filename, mime_type, hash, created_at, - extracted_location, width, height - FROM media - WHERE id = $1 - "#, - id - ) - .fetch_optional(&self.pool) - .await - .map_err(|e| CoreError::Database(e.to_string())) - } - - async fn list_by_user(&self, user_id: Uuid) -> CoreResult> { - sqlx::query_as!( - Media, - r#" - SELECT id, owner_id, storage_path, original_filename, mime_type, hash, created_at, - extracted_location, width, height - FROM media - WHERE owner_id = $1 - "#, - user_id - ) - .fetch_all(&self.pool) - .await - .map_err(|e| CoreError::Database(e.to_string())) - } -} diff --git a/libertas_api/src/repositories/mod.rs b/libertas_api/src/repositories/mod.rs deleted file mode 100644 index f1f652d..0000000 --- a/libertas_api/src/repositories/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod album_repository; -pub mod media_repository; -pub mod user_repository; diff --git a/libertas_api/src/repositories/user_repository.rs b/libertas_api/src/repositories/user_repository.rs deleted file mode 100644 index b18cc0e..0000000 --- a/libertas_api/src/repositories/user_repository.rs +++ /dev/null @@ -1,96 +0,0 @@ -use async_trait::async_trait; -use libertas_core::{ - error::{CoreError, CoreResult}, - models::User, - repositories::UserRepository, -}; -use sqlx::{PgPool, SqlitePool, types::Uuid}; - -#[derive(Clone)] -pub struct PostgresUserRepository { - pool: PgPool, -} - -impl PostgresUserRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[derive(Clone)] -pub struct SqliteUserRepository { - _pool: SqlitePool, -} - -impl SqliteUserRepository { - pub fn new(pool: SqlitePool) -> Self { - Self { _pool: pool } - } -} - -#[async_trait] -impl UserRepository for PostgresUserRepository { - async fn create(&self, user: User) -> CoreResult<()> { - sqlx::query!( - r#" - INSERT INTO users (id, username, email, hashed_password, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6) - "#, - user.id, - user.username, - user.email, - user.hashed_password, - user.created_at, - user.updated_at - ) - .execute(&self.pool) - .await - .map_err(|e| CoreError::Database(e.to_string()))?; - - Ok(()) - } - - async fn find_by_email(&self, email: &str) -> CoreResult> { - sqlx::query_as!(User, "SELECT * FROM users WHERE email = $1", email) - .fetch_optional(&self.pool) - .await - .map_err(|e| CoreError::Database(e.to_string())) - } - - async fn find_by_username(&self, username: &str) -> CoreResult> { - sqlx::query_as!(User, "SELECT * FROM users WHERE username = $1", username) - .fetch_optional(&self.pool) - .await - .map_err(|e| CoreError::Database(e.to_string())) - } - - async fn find_by_id(&self, id: Uuid) -> CoreResult> { - sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", id) - .fetch_optional(&self.pool) - .await - .map_err(|e| CoreError::Database(e.to_string())) - } -} - -#[async_trait] -impl UserRepository for SqliteUserRepository { - async fn create(&self, _user: User) -> CoreResult<()> { - println!("SQLITE REPO: Creating user"); - Ok(()) - } - - async fn find_by_email(&self, _email: &str) -> CoreResult> { - println!("SQLITE REPO: Finding user by email"); - Ok(None) - } - - async fn find_by_username(&self, _username: &str) -> CoreResult> { - println!("SQLITE REPO: Finding user by username"); - Ok(None) - } - - async fn find_by_id(&self, _id: Uuid) -> CoreResult> { - println!("SQLITE REPO: Finding user by id"); - Ok(None) - } -} diff --git a/libertas_core/src/plugins.rs b/libertas_core/src/plugins.rs index 550c604..97d26bb 100644 --- a/libertas_core/src/plugins.rs +++ b/libertas_core/src/plugins.rs @@ -16,6 +16,7 @@ pub struct PluginContext { pub media_repo: Arc, pub album_repo: Arc, pub user_repo: Arc, + pub media_library_path: String, } #[async_trait] diff --git a/libertas_core/src/repositories.rs b/libertas_core/src/repositories.rs index 345240f..22c9407 100644 --- a/libertas_core/src/repositories.rs +++ b/libertas_core/src/repositories.rs @@ -12,6 +12,13 @@ pub trait MediaRepository: Send + Sync { async fn create(&self, media: &Media) -> CoreResult<()>; async fn find_by_id(&self, id: Uuid) -> CoreResult>; async fn list_by_user(&self, user_id: Uuid) -> CoreResult>; + async fn update_metadata( + &self, + id: Uuid, + width: Option, + height: Option, + location: Option, + ) -> CoreResult<()>; } #[async_trait] diff --git a/libertas_infra/src/repositories/media_repository.rs b/libertas_infra/src/repositories/media_repository.rs index 7983b73..8fc03ab 100644 --- a/libertas_infra/src/repositories/media_repository.rs +++ b/libertas_infra/src/repositories/media_repository.rs @@ -90,4 +90,29 @@ impl MediaRepository for PostgresMediaRepository { .await .map_err(|e| CoreError::Database(e.to_string())) } + + async fn update_metadata( + &self, + id: Uuid, + width: Option, + height: Option, + location: Option, + ) -> CoreResult<()> { + sqlx::query!( + r#" + UPDATE media + SET width = $2, height = $3, extracted_location = $4 + WHERE id = $1 + "#, + id, + width, + height, + location + ) + .execute(&self.pool) + .await + .map_err(|e| CoreError::Database(e.to_string()))?; + + Ok(()) + } } diff --git a/libertas_worker/Cargo.toml b/libertas_worker/Cargo.toml index 87adee7..ee62a1b 100644 --- a/libertas_worker/Cargo.toml +++ b/libertas_worker/Cargo.toml @@ -22,3 +22,5 @@ sqlx = { version = "0.8.6", features = [ futures-util = "0.3.31" bytes = "1.10.1" uuid = { version = "1.18.1", features = ["v4", "serde"] } +nom-exif = { version = "2.5.4", features = ["serde", "tokio", "async"] } +async-trait = "0.1.89" diff --git a/libertas_worker/src/main.rs b/libertas_worker/src/main.rs index 1f18bf2..c68b471 100644 --- a/libertas_worker/src/main.rs +++ b/libertas_worker/src/main.rs @@ -8,9 +8,11 @@ use libertas_infra::factory::{ use serde::Deserialize; use uuid::Uuid; -use crate::config::load_config; +use crate::{config::load_config, plugin_manager::PluginManager}; pub mod config; +pub mod plugin_manager; +pub mod plugins; #[derive(Deserialize)] struct MediaJob { @@ -30,14 +32,16 @@ async fn main() -> anyhow::Result<()> { let album_repo = build_album_repository(&config.database, db_pool.clone()).await?; let user_repo = build_user_repository(&config.database, db_pool.clone()).await?; - // 3. Create the abstracted PluginContext let context = Arc::new(PluginContext { media_repo, album_repo, user_repo, + media_library_path: config.media_library_path.clone(), }); println!("Plugin context created."); + let plugin_manager = Arc::new(PluginManager::new()); + let nats_client = async_nats::connect(&config.broker_url).await?; println!("Connected to NATS server at {}", config.broker_url); @@ -49,8 +53,9 @@ async fn main() -> anyhow::Result<()> { while let Some(msg) = subscriber.next().await { let context = context.clone(); + let manager = plugin_manager.clone(); tokio::spawn(async move { - if let Err(e) = process_job(msg, context).await { + if let Err(e) = process_job(msg, context, manager).await { eprintln!("Job failed: {}", e); } }); @@ -59,7 +64,11 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -async fn process_job(msg: async_nats::Message, context: Arc) -> anyhow::Result<()> { +async fn process_job( + msg: async_nats::Message, + context: Arc, + plugin_manager: Arc, +) -> anyhow::Result<()> { let job: MediaJob = serde_json::from_slice(&msg.payload)?; let media = context @@ -70,10 +79,8 @@ async fn process_job(msg: async_nats::Message, context: Arc) -> a println!("Processing media: {}", media.original_filename); - // 3. Pass to the (future) PluginManager - // plugin_manager.process(&media, &context).await?; + plugin_manager.process_media(&media, &context).await; - // For now, we'll just print a success message println!("Successfully processed job for media_id: {}", media.id); Ok(()) diff --git a/libertas_worker/src/plugin_manager.rs b/libertas_worker/src/plugin_manager.rs new file mode 100644 index 0000000..d31d568 --- /dev/null +++ b/libertas_worker/src/plugin_manager.rs @@ -0,0 +1,38 @@ +use std::sync::Arc; + +use libertas_core::{ + models::Media, + plugins::{MediaProcessorPlugin, PluginContext}, +}; + +use crate::plugins::exif_reader::ExifReaderPlugin; + +pub struct PluginManager { + plugins: Vec>, +} + +impl PluginManager { + pub fn new() -> Self { + let mut plugins: Vec> = Vec::new(); + + plugins.push(Arc::new(ExifReaderPlugin)); + + println!("PluginManager loaded {} plugins", plugins.len()); + Self { plugins } + } + + pub async fn process_media(&self, media: &Media, context: &Arc) { + println!( + "PluginManager processing media: {}", + media.original_filename + ); + for plugin in &self.plugins { + println!("Running plugin: {}", plugin.name()); + match plugin.process(media, context).await { + Ok(data) => println!("Plugin {} succeeded: {}", plugin.name(), data.message), + Err(e) => eprintln!("Plugin {} failed: {}", plugin.name(), e), + } + } + println!("PluginManager finished processing media: {}", media.id); + } +} diff --git a/libertas_worker/src/plugins/exif_reader.rs b/libertas_worker/src/plugins/exif_reader.rs new file mode 100644 index 0000000..78d7bba --- /dev/null +++ b/libertas_worker/src/plugins/exif_reader.rs @@ -0,0 +1,79 @@ +use async_trait::async_trait; +use std::path::PathBuf; + +use libertas_core::{ + error::{CoreError, CoreResult}, + models::Media, + plugins::{MediaProcessorPlugin, PluginContext, PluginData}, +}; +use nom_exif::{AsyncMediaParser, AsyncMediaSource, Exif, ExifIter, ExifTag}; + +pub struct ExifReaderPlugin; + +#[async_trait] +impl MediaProcessorPlugin for ExifReaderPlugin { + fn name(&self) -> &'static str { + "exif_reader" + } + + async fn process(&self, media: &Media, context: &PluginContext) -> CoreResult { + let file_path = PathBuf::from(&context.media_library_path).join(&media.storage_path); + + let ms = match AsyncMediaSource::file_path(file_path).await { + Ok(ms) => ms, + Err(e) => return Err(CoreError::Unknown(format!("Failed to open a file: {}", e))), + }; + + if !ms.has_exif() { + return Ok(PluginData { + message: "No EXIF data found in file.".to_string(), + }); + } + + let mut parser = AsyncMediaParser::new(); + let iter: ExifIter = match parser.parse(ms).await { + Ok(iter) => iter, + Err(e) => { + // It's not a fatal error, just means parsing failed (e.g., corrupt data) + return Ok(PluginData { + message: format!("Could not parse EXIF: {}", e), + }); + } + }; + + let location: Option = match iter.parse_gps_info() { + Ok(Some(gps_info)) => Some(gps_info.format_iso6709()), + Ok(None) => None, + Err(_) => None, + }; + + let exif: Exif = iter.into(); + + let width = exif + .get(ExifTag::ExifImageWidth) + .and_then(|f| f.as_u32()) + .map(|v| v as i32); + + let height = exif + .get(ExifTag::ExifImageHeight) + .and_then(|f| f.as_u32()) + .map(|v| v as i32); + + if width.is_some() || height.is_some() || location.is_some() { + context + .media_repo + .update_metadata(media.id, width, height, location.clone()) + .await?; + + let message = format!( + "Extracted EXIF: width={:?}, height={:?}, location={:?}", + width, height, location + ); + Ok(PluginData { message }) + } else { + Ok(PluginData { + message: "No EXIF width/height or GPS location found.".to_string(), + }) + } + } +} diff --git a/libertas_worker/src/plugins/mod.rs b/libertas_worker/src/plugins/mod.rs new file mode 100644 index 0000000..94e193b --- /dev/null +++ b/libertas_worker/src/plugins/mod.rs @@ -0,0 +1 @@ +pub mod exif_reader;