feat: implement media processing plugins and update repository structure

This commit is contained in:
2025-11-02 15:40:39 +01:00
parent a5a88c7f33
commit 4427428cf6
14 changed files with 232 additions and 291 deletions

65
Cargo.lock generated
View File

@@ -32,6 +32,15 @@ version = "1.0.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61"
[[package]]
name = "approx"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cab112f0a86d568ea0e627cc1d6be74a1e9cd55214684db5561995f6dad897c6"
dependencies = [
"num-traits",
]
[[package]]
name = "argon2"
version = "0.5.3"
@@ -730,6 +739,17 @@ dependencies = [
"zeroize",
]
[[package]]
name = "geo-types"
version = "0.7.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75a4dcd69d35b2c87a7c83bce9af69fd65c9d68d3833a0ded568983928f3fc99"
dependencies = [
"approx",
"num-traits",
"serde",
]
[[package]]
name = "getrandom"
version = "0.2.16"
@@ -1072,6 +1092,16 @@ dependencies = [
"hashbrown 0.16.0",
]
[[package]]
name = "iso6709parse"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5090db9c6a716d1f4eeb729957e889e9c28156061c825cbccd44950cf0f3c66"
dependencies = [
"geo-types",
"nom",
]
[[package]]
name = "itoa"
version = "1.0.15"
@@ -1184,10 +1214,12 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-nats",
"async-trait",
"bytes",
"futures-util",
"libertas_core",
"libertas_infra",
"nom-exif",
"serde",
"serde_json",
"sqlx",
@@ -1272,6 +1304,12 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "mio"
version = "1.1.0"
@@ -1315,6 +1353,33 @@ dependencies = [
"signatory",
]
[[package]]
name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
name = "nom-exif"
version = "2.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5a6703c263bdeb67ea61c7a7605ebfd42996c49cbf8558724b88fd67804f35d"
dependencies = [
"bytes",
"chrono",
"iso6709parse",
"nom",
"regex",
"serde",
"thiserror 2.0.17",
"tokio",
"tracing",
]
[[package]]
name = "nuid"
version = "0.5.0"

View File

@@ -5,7 +5,6 @@ pub mod error;
pub mod factory;
pub mod handlers;
pub mod middleware;
pub mod repositories;
pub mod routes;
pub mod security;
pub mod services;

View File

@@ -1,91 +0,0 @@
use async_trait::async_trait;
use libertas_core::{
error::{CoreError, CoreResult},
models::Album,
repositories::AlbumRepository,
};
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Clone)]
pub struct PostgresAlbumRepository {
pool: PgPool,
}
impl PostgresAlbumRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait]
impl AlbumRepository for PostgresAlbumRepository {
async fn create(&self, album: Album) -> CoreResult<()> {
sqlx::query!(
r#"
INSERT INTO albums (id, owner_id, name, description, is_public, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
album.id,
album.owner_id,
album.name,
album.description,
album.is_public,
album.created_at,
album.updated_at
)
.execute(&self.pool)
.await
.map_err(|e| CoreError::Database(e.to_string()))?;
Ok(())
}
async fn find_by_id(&self, id: Uuid) -> CoreResult<Option<Album>> {
sqlx::query_as!(
Album,
r#"
SELECT id, owner_id, name, description, is_public, created_at, updated_at
FROM albums
WHERE id = $1
"#,
id
)
.fetch_optional(&self.pool)
.await
.map_err(|e| CoreError::Database(e.to_string()))
}
async fn list_by_user(&self, user_id: Uuid) -> CoreResult<Vec<Album>> {
sqlx::query_as!(
Album,
r#"
SELECT id, owner_id, name, description, is_public, created_at, updated_at
FROM albums
WHERE owner_id = $1
"#,
user_id
)
.fetch_all(&self.pool)
.await
.map_err(|e| CoreError::Database(e.to_string()))
}
async fn add_media_to_album(&self, album_id: Uuid, media_ids: &[Uuid]) -> CoreResult<()> {
// Use sqlx's `unnest` feature to pass the Vec<Uuid> efficiently
sqlx::query!(
r#"
INSERT INTO album_media (album_id, media_id)
SELECT $1, media_id FROM unnest($2::uuid[]) as media_id
ON CONFLICT (album_id, media_id) DO NOTHING
"#,
album_id,
media_ids
)
.execute(&self.pool)
.await
.map_err(|e| CoreError::Database(e.to_string()))?;
Ok(())
}
}

View File

@@ -1,93 +0,0 @@
use async_trait::async_trait;
use libertas_core::{
error::{CoreError, CoreResult},
models::Media,
repositories::MediaRepository,
};
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Clone)]
pub struct PostgresMediaRepository {
pool: PgPool,
}
impl PostgresMediaRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait]
impl MediaRepository for PostgresMediaRepository {
async fn create(&self, media: &Media) -> CoreResult<()> {
sqlx::query!(
r#"
INSERT INTO media (id, owner_id, storage_path, original_filename, mime_type, hash, created_at, width, height)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
"#,
media.id,
media.owner_id,
media.storage_path,
media.original_filename,
media.mime_type,
media.hash,
media.created_at,
media.width,
media.height
)
.execute(&self.pool)
.await
.map_err(|e| CoreError::Database(e.to_string()))?;
Ok(())
}
async fn find_by_hash(&self, hash: &str) -> CoreResult<Option<Media>> {
sqlx::query_as!(
Media,
r#"
SELECT id, owner_id, storage_path, original_filename, mime_type, hash, created_at,
extracted_location, width, height
FROM media
WHERE hash = $1
"#,
hash
)
.fetch_optional(&self.pool)
.await
.map_err(|e| CoreError::Database(e.to_string()))
}
async fn find_by_id(&self, id: Uuid) -> CoreResult<Option<Media>> {
sqlx::query_as!(
Media,
r#"
SELECT id, owner_id, storage_path, original_filename, mime_type, hash, created_at,
extracted_location, width, height
FROM media
WHERE id = $1
"#,
id
)
.fetch_optional(&self.pool)
.await
.map_err(|e| CoreError::Database(e.to_string()))
}
async fn list_by_user(&self, user_id: Uuid) -> CoreResult<Vec<Media>> {
sqlx::query_as!(
Media,
r#"
SELECT id, owner_id, storage_path, original_filename, mime_type, hash, created_at,
extracted_location, width, height
FROM media
WHERE owner_id = $1
"#,
user_id
)
.fetch_all(&self.pool)
.await
.map_err(|e| CoreError::Database(e.to_string()))
}
}

View File

@@ -1,3 +0,0 @@
pub mod album_repository;
pub mod media_repository;
pub mod user_repository;

View File

@@ -1,96 +0,0 @@
use async_trait::async_trait;
use libertas_core::{
error::{CoreError, CoreResult},
models::User,
repositories::UserRepository,
};
use sqlx::{PgPool, SqlitePool, types::Uuid};
#[derive(Clone)]
pub struct PostgresUserRepository {
pool: PgPool,
}
impl PostgresUserRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[derive(Clone)]
pub struct SqliteUserRepository {
_pool: SqlitePool,
}
impl SqliteUserRepository {
pub fn new(pool: SqlitePool) -> Self {
Self { _pool: pool }
}
}
#[async_trait]
impl UserRepository for PostgresUserRepository {
async fn create(&self, user: User) -> CoreResult<()> {
sqlx::query!(
r#"
INSERT INTO users (id, username, email, hashed_password, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6)
"#,
user.id,
user.username,
user.email,
user.hashed_password,
user.created_at,
user.updated_at
)
.execute(&self.pool)
.await
.map_err(|e| CoreError::Database(e.to_string()))?;
Ok(())
}
async fn find_by_email(&self, email: &str) -> CoreResult<Option<User>> {
sqlx::query_as!(User, "SELECT * FROM users WHERE email = $1", email)
.fetch_optional(&self.pool)
.await
.map_err(|e| CoreError::Database(e.to_string()))
}
async fn find_by_username(&self, username: &str) -> CoreResult<Option<User>> {
sqlx::query_as!(User, "SELECT * FROM users WHERE username = $1", username)
.fetch_optional(&self.pool)
.await
.map_err(|e| CoreError::Database(e.to_string()))
}
async fn find_by_id(&self, id: Uuid) -> CoreResult<Option<User>> {
sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", id)
.fetch_optional(&self.pool)
.await
.map_err(|e| CoreError::Database(e.to_string()))
}
}
#[async_trait]
impl UserRepository for SqliteUserRepository {
async fn create(&self, _user: User) -> CoreResult<()> {
println!("SQLITE REPO: Creating user");
Ok(())
}
async fn find_by_email(&self, _email: &str) -> CoreResult<Option<User>> {
println!("SQLITE REPO: Finding user by email");
Ok(None)
}
async fn find_by_username(&self, _username: &str) -> CoreResult<Option<User>> {
println!("SQLITE REPO: Finding user by username");
Ok(None)
}
async fn find_by_id(&self, _id: Uuid) -> CoreResult<Option<User>> {
println!("SQLITE REPO: Finding user by id");
Ok(None)
}
}

View File

@@ -16,6 +16,7 @@ pub struct PluginContext {
pub media_repo: Arc<dyn MediaRepository>,
pub album_repo: Arc<dyn AlbumRepository>,
pub user_repo: Arc<dyn UserRepository>,
pub media_library_path: String,
}
#[async_trait]

View File

@@ -12,6 +12,13 @@ pub trait MediaRepository: Send + Sync {
async fn create(&self, media: &Media) -> CoreResult<()>;
async fn find_by_id(&self, id: Uuid) -> CoreResult<Option<Media>>;
async fn list_by_user(&self, user_id: Uuid) -> CoreResult<Vec<Media>>;
async fn update_metadata(
&self,
id: Uuid,
width: Option<i32>,
height: Option<i32>,
location: Option<String>,
) -> CoreResult<()>;
}
#[async_trait]

View File

@@ -90,4 +90,29 @@ impl MediaRepository for PostgresMediaRepository {
.await
.map_err(|e| CoreError::Database(e.to_string()))
}
async fn update_metadata(
&self,
id: Uuid,
width: Option<i32>,
height: Option<i32>,
location: Option<String>,
) -> CoreResult<()> {
sqlx::query!(
r#"
UPDATE media
SET width = $2, height = $3, extracted_location = $4
WHERE id = $1
"#,
id,
width,
height,
location
)
.execute(&self.pool)
.await
.map_err(|e| CoreError::Database(e.to_string()))?;
Ok(())
}
}

View File

@@ -22,3 +22,5 @@ sqlx = { version = "0.8.6", features = [
futures-util = "0.3.31"
bytes = "1.10.1"
uuid = { version = "1.18.1", features = ["v4", "serde"] }
nom-exif = { version = "2.5.4", features = ["serde", "tokio", "async"] }
async-trait = "0.1.89"

View File

@@ -8,9 +8,11 @@ use libertas_infra::factory::{
use serde::Deserialize;
use uuid::Uuid;
use crate::config::load_config;
use crate::{config::load_config, plugin_manager::PluginManager};
pub mod config;
pub mod plugin_manager;
pub mod plugins;
#[derive(Deserialize)]
struct MediaJob {
@@ -30,14 +32,16 @@ async fn main() -> anyhow::Result<()> {
let album_repo = build_album_repository(&config.database, db_pool.clone()).await?;
let user_repo = build_user_repository(&config.database, db_pool.clone()).await?;
// 3. Create the abstracted PluginContext
let context = Arc::new(PluginContext {
media_repo,
album_repo,
user_repo,
media_library_path: config.media_library_path.clone(),
});
println!("Plugin context created.");
let plugin_manager = Arc::new(PluginManager::new());
let nats_client = async_nats::connect(&config.broker_url).await?;
println!("Connected to NATS server at {}", config.broker_url);
@@ -49,8 +53,9 @@ async fn main() -> anyhow::Result<()> {
while let Some(msg) = subscriber.next().await {
let context = context.clone();
let manager = plugin_manager.clone();
tokio::spawn(async move {
if let Err(e) = process_job(msg, context).await {
if let Err(e) = process_job(msg, context, manager).await {
eprintln!("Job failed: {}", e);
}
});
@@ -59,7 +64,11 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}
async fn process_job(msg: async_nats::Message, context: Arc<PluginContext>) -> anyhow::Result<()> {
async fn process_job(
msg: async_nats::Message,
context: Arc<PluginContext>,
plugin_manager: Arc<PluginManager>,
) -> anyhow::Result<()> {
let job: MediaJob = serde_json::from_slice(&msg.payload)?;
let media = context
@@ -70,10 +79,8 @@ async fn process_job(msg: async_nats::Message, context: Arc<PluginContext>) -> a
println!("Processing media: {}", media.original_filename);
// 3. Pass to the (future) PluginManager
// plugin_manager.process(&media, &context).await?;
plugin_manager.process_media(&media, &context).await;
// For now, we'll just print a success message
println!("Successfully processed job for media_id: {}", media.id);
Ok(())

View File

@@ -0,0 +1,38 @@
use std::sync::Arc;
use libertas_core::{
models::Media,
plugins::{MediaProcessorPlugin, PluginContext},
};
use crate::plugins::exif_reader::ExifReaderPlugin;
pub struct PluginManager {
plugins: Vec<Arc<dyn MediaProcessorPlugin>>,
}
impl PluginManager {
pub fn new() -> Self {
let mut plugins: Vec<Arc<dyn MediaProcessorPlugin>> = Vec::new();
plugins.push(Arc::new(ExifReaderPlugin));
println!("PluginManager loaded {} plugins", plugins.len());
Self { plugins }
}
pub async fn process_media(&self, media: &Media, context: &Arc<PluginContext>) {
println!(
"PluginManager processing media: {}",
media.original_filename
);
for plugin in &self.plugins {
println!("Running plugin: {}", plugin.name());
match plugin.process(media, context).await {
Ok(data) => println!("Plugin {} succeeded: {}", plugin.name(), data.message),
Err(e) => eprintln!("Plugin {} failed: {}", plugin.name(), e),
}
}
println!("PluginManager finished processing media: {}", media.id);
}
}

View File

@@ -0,0 +1,79 @@
use async_trait::async_trait;
use std::path::PathBuf;
use libertas_core::{
error::{CoreError, CoreResult},
models::Media,
plugins::{MediaProcessorPlugin, PluginContext, PluginData},
};
use nom_exif::{AsyncMediaParser, AsyncMediaSource, Exif, ExifIter, ExifTag};
pub struct ExifReaderPlugin;
#[async_trait]
impl MediaProcessorPlugin for ExifReaderPlugin {
fn name(&self) -> &'static str {
"exif_reader"
}
async fn process(&self, media: &Media, context: &PluginContext) -> CoreResult<PluginData> {
let file_path = PathBuf::from(&context.media_library_path).join(&media.storage_path);
let ms = match AsyncMediaSource::file_path(file_path).await {
Ok(ms) => ms,
Err(e) => return Err(CoreError::Unknown(format!("Failed to open a file: {}", e))),
};
if !ms.has_exif() {
return Ok(PluginData {
message: "No EXIF data found in file.".to_string(),
});
}
let mut parser = AsyncMediaParser::new();
let iter: ExifIter = match parser.parse(ms).await {
Ok(iter) => iter,
Err(e) => {
// It's not a fatal error, just means parsing failed (e.g., corrupt data)
return Ok(PluginData {
message: format!("Could not parse EXIF: {}", e),
});
}
};
let location: Option<String> = match iter.parse_gps_info() {
Ok(Some(gps_info)) => Some(gps_info.format_iso6709()),
Ok(None) => None,
Err(_) => None,
};
let exif: Exif = iter.into();
let width = exif
.get(ExifTag::ExifImageWidth)
.and_then(|f| f.as_u32())
.map(|v| v as i32);
let height = exif
.get(ExifTag::ExifImageHeight)
.and_then(|f| f.as_u32())
.map(|v| v as i32);
if width.is_some() || height.is_some() || location.is_some() {
context
.media_repo
.update_metadata(media.id, width, height, location.clone())
.await?;
let message = format!(
"Extracted EXIF: width={:?}, height={:?}, location={:?}",
width, height, location
);
Ok(PluginData { message })
} else {
Ok(PluginData {
message: "No EXIF width/height or GPS location found.".to_string(),
})
}
}
}

View File

@@ -0,0 +1 @@
pub mod exif_reader;