feat: Update dependencies and implement face detection features

- Updated async-nats dependency to version 0.45.0 in both libertas_api and libertas_worker.
- Introduced AI-related structures and traits in libertas_core for face detection.
- Added AiConfig and FaceDetectorRuntime enums to support different face detection methods.
- Implemented TractFaceDetector and RemoteNatsFaceDetector in libertas_infra for local and remote face detection.
- Created FaceDetectionPlugin to integrate face detection into the media processing pipeline.
- Enhanced XMP writing functionality to include face region data.
- Updated PluginManager to initialize face detection plugins based on configuration.
This commit is contained in:
2025-11-15 21:29:17 +01:00
parent e6c941bf28
commit 98f56e4f1e
17 changed files with 1045 additions and 101 deletions

2
.gitignore vendored
View File

@@ -1,6 +1,8 @@
ai_models/
target/
.sqlx/
media_library/
thumbnail_library/
.ai/
.env

683
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -31,7 +31,7 @@ rand_core = { version = "0.9.3", features = ["std"] }
sha2 = "0.10.9"
futures = "0.3.31"
bytes = "1.10.1"
async-nats = "0.44.2"
async-nats = "0.45.0"
tower = { version = "0.5.2", features = ["util"] }
tower-http = { version = "0.6.6", features = ["fs", "trace"] }
tracing = "0.1.41"

17
libertas_core/src/ai.rs Normal file
View File

@@ -0,0 +1,17 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::error::CoreResult;
#[derive(Debug, Serialize, Deserialize)]
pub struct BoundingBox {
pub x_min: f32,
pub y_min: f32,
pub x_max: f32,
pub y_max: f32,
}
#[async_trait]
pub trait FaceDetector: Send + Sync {
async fn detect_faces(&self, image_bytes: &[u8]) -> CoreResult<Vec<BoundingBox>>;
}

View File

@@ -31,6 +31,20 @@ pub struct ThumbnailConfig {
pub library_path: String,
}
#[derive(Deserialize, Clone, Debug)]
#[serde(rename_all = "lowercase")]
pub enum FaceDetectorRuntime {
Tract,
Onnx,
RemoteNats { subject: String },
}
#[derive(Deserialize, Clone, Debug)]
pub struct AiConfig {
pub face_detector_runtime: FaceDetectorRuntime,
pub face_detector_model_path: Option<String>,
}
#[derive(Deserialize, Clone, Debug)]
pub struct Config {
pub database_url: String,
@@ -50,6 +64,7 @@ pub struct Config {
pub allowed_sort_columns: Vec<String>,
pub thumbnail_config: Option<ThumbnailConfig>,
pub ai_config: Option<AiConfig>,
}
fn default_max_upload_size() -> u32 {
@@ -73,6 +88,7 @@ pub struct AppConfig {
pub default_storage_quota_gb: Option<u64>,
pub allowed_sort_columns: Option<Vec<String>>,
pub thumbnail_config: Option<ThumbnailConfig>,
pub ai_config: Option<AiConfig>,
}
pub fn load_config() -> CoreResult<AppConfig> {
@@ -108,5 +124,6 @@ pub fn load_config() -> CoreResult<AppConfig> {
default_storage_quota_gb: Some(config.default_storage_quota_gb),
allowed_sort_columns: Some(config.allowed_sort_columns),
thumbnail_config: config.thumbnail_config,
ai_config: config.ai_config,
})
}

View File

@@ -1,9 +1,10 @@
pub mod ai;
pub mod authz;
pub mod config;
pub mod error;
pub mod media_utils;
pub mod models;
pub mod plugins;
pub mod repositories;
pub mod schema;
pub mod services;
pub mod media_utils;

View File

@@ -16,3 +16,9 @@ async-trait = "0.1.89"
uuid = { version = "1.18.1", features = ["v4"] }
chrono = "0.4.42"
serde = { version = "1.0.228", features = ["derive"] }
async-nats = "0.45.0"
serde_json = "1.0.145"
tract-onnx = "0.22.0"
ndarray = "0.17.1"
image = "0.25.8"
tokio = { version = "1.48.0", features = ["full"] }

View File

@@ -0,0 +1,2 @@
pub mod remote_detector;
pub mod tract_detector;

View File

@@ -0,0 +1,40 @@
use async_trait::async_trait;
use libertas_core::{
ai::{BoundingBox, FaceDetector},
error::{CoreError, CoreResult},
};
pub struct RemoteNatsFaceDetector {
client: async_nats::Client,
subject: String,
}
impl RemoteNatsFaceDetector {
pub fn new(client: async_nats::Client, subject: &str) -> Self {
Self {
client,
subject: subject.to_string(),
}
}
}
#[async_trait]
impl FaceDetector for RemoteNatsFaceDetector {
//TODO: I don't think this is the most efficient way to send image bytes over NATS, we probably would want to use some protobuf or some other thing
async fn detect_faces(&self, image_bytes: &[u8]) -> CoreResult<Vec<BoundingBox>> {
println!("Offloading face detection to remote worker via NATS...");
let bytes = image_bytes.to_vec();
let response = self
.client
.request(self.subject.clone(), bytes.into())
.await
.map_err(|e| CoreError::Unknown(format!("NATS request failed: {}", e)))?;
let boxes: Vec<BoundingBox> = serde_json::from_slice(&response.payload)
.map_err(|e| CoreError::Unknown(format!("Failed to parse remote response: {}", e)))?;
Ok(boxes)
}
}

View File

@@ -0,0 +1,189 @@
use std::cmp::Ordering;
use async_trait::async_trait;
use image::{GenericImageView, RgbImage, imageops};
use libertas_core::{
ai::{BoundingBox, FaceDetector},
error::{CoreError, CoreResult},
};
use tract_onnx::{
prelude::*,
tract_core::ndarray::{Array4, Axis, s},
};
type TractModel = SimplePlan<TypedFact, Box<dyn TypedOp>, Graph<TypedFact, Box<dyn TypedOp>>>;
pub struct TractFaceDetector {
model: Arc<TractModel>,
}
impl TractFaceDetector {
pub fn new(model_path: &str) -> CoreResult<Self> {
let model = tract_onnx::onnx()
.model_for_path(model_path)
.map_err(|e| CoreError::Config(format!("Failed to load model: {}", e)))?
.with_input_fact(0, f32::fact([1, 3, 640, 640]).into())
.map_err(|e| CoreError::Config(format!("Failed to set input fact: {}", e)))?
.into_optimized()
.map_err(|e| CoreError::Config(format!("Failed to optimize model: {}", e)))?
.into_runnable()
.map_err(|e| CoreError::Config(format!("Failed to make model runnable: {}", e)))?;
Ok(Self {
model: Arc::new(model),
})
}
}
#[async_trait]
impl FaceDetector for TractFaceDetector {
async fn detect_faces(&self, image_bytes: &[u8]) -> CoreResult<Vec<BoundingBox>> {
let image_bytes = image_bytes.to_vec();
let model = self.model.clone();
tokio::task::spawn_blocking(move || {
let img = image::load_from_memory(&image_bytes)
.map_err(|e| CoreError::Unknown(format!("Failed to load image: {}", e)))?;
let (original_width, original_height) = img.dimensions();
let scale = 640.0 / (original_width.max(original_height) as f32);
let new_width = (original_width as f32 * scale) as u32;
let new_height = (original_height as f32 * scale) as u32;
let resized = imageops::resize(
&img.to_rgb8(),
new_width,
new_height,
imageops::FilterType::Triangle,
);
let mut padded = RgbImage::new(640, 640);
let pad_x = (640 - new_width) as i64 / 2;
let pad_y = (640 - new_height) as i64 / 2;
imageops::replace(&mut padded, &resized, pad_x, pad_y);
let tensor: Tensor = Array4::from_shape_fn((1, 3, 640, 640), |(_, c, y, x)| {
padded.get_pixel(x as u32, y as u32)[c] as f32 / 255.0
})
.into();
let result = model
.run(tvec!(tensor.into()))
.map_err(|e| CoreError::Unknown(format!("Model inference failed: {}", e)))?;
let results = result[0]
.to_array_view::<f32>()
.map_err(|e| {
CoreError::Unknown(format!("Failed to convert model output to array: {}", e))
})?
.view()
.t()
.into_owned();
let mut bbox_vec: Vec<InternalBbox> = vec![];
for i in 0..results.len_of(Axis(0)) {
// Iterate 8400 times
let row = results.slice(s![i, .., ..]); // Get shape [5, 1]
let confidence = row[[4, 0]];
if confidence >= 0.5 {
// Confidence threshold
let x = row[[0, 0]];
let y = row[[1, 0]];
let w = row[[2, 0]];
let h = row[[3, 0]];
// Convert (center_x, center_y, w, h) to (x1, y1, x2, y2)
let x1 = x - w / 2.0;
let y1 = y - h / 2.0;
let x2 = x + w / 2.0;
let y2 = y + h / 2.0;
bbox_vec.push(InternalBbox::new(x1, y1, x2, y2, confidence));
}
}
let final_boxes = non_maximum_suppression(bbox_vec, 0.45); // 0.45 IOU threshold
// --- 5. Convert to original coordinates ---
let boxes: Vec<_> = final_boxes
.into_iter()
.map(|b| {
// Reverse padding
let x1_unpadded = b.x1 - (pad_x as f32);
let y1_unpadded = b.y1 - (pad_y as f32);
let x2_unpadded = b.x2 - (pad_x as f32);
let y2_unpadded = b.y2 - (pad_y as f32);
// Reverse scaling and clamp to original image dimensions
let x_min = (x1_unpadded / scale).max(0.0);
let y_min = (y1_unpadded / scale).max(0.0);
let x_max = (x2_unpadded / scale).min(original_width as f32);
let y_max = (y2_unpadded / scale).min(original_height as f32);
BoundingBox {
x_min,
y_min,
x_max,
y_max,
}
})
.collect();
println!(
"Running face detection locally on the CPU... found {} faces.",
boxes.len()
);
Ok(boxes)
})
.await
.map_err(|e| CoreError::Unknown(format!("Failed to run face detection: {}", e)))?
}
}
#[derive(Debug, Clone)]
struct InternalBbox {
pub x1: f32,
pub y1: f32,
pub x2: f32,
pub y2: f32,
pub confidence: f32,
}
impl InternalBbox {
fn new(x1: f32, y1: f32, x2: f32, y2: f32, confidence: f32) -> Self {
Self {
x1,
y1,
x2,
y2,
confidence,
}
}
}
fn non_maximum_suppression(mut boxes: Vec<InternalBbox>, iou_threshold: f32) -> Vec<InternalBbox> {
boxes.sort_by(|a, b| {
a.confidence
.partial_cmp(&b.confidence)
.unwrap_or(Ordering::Equal)
});
let mut keep = Vec::new();
while !boxes.is_empty() {
let current = boxes.remove(0);
keep.push(current.clone());
boxes.retain(|box_| calculate_iou(&current, box_) <= iou_threshold);
}
keep
}
fn calculate_iou(box1: &InternalBbox, box2: &InternalBbox) -> f32 {
let x1 = box1.x1.max(box2.x1);
let y1 = box1.y1.max(box2.y1);
let x2 = box1.x2.min(box2.x2);
let y2 = box1.y2.min(box2.y2);
let intersection = (x2 - x1).max(0.0) * (y2 - y1).max(0.0);
let area1 = (box1.x2 - box1.x1) * (box1.y2 - box1.y1);
let area2 = (box2.x2 - box2.x1) * (box2.y2 - box2.y1);
let union = area1 + area2 - intersection;
intersection / union
}

View File

@@ -1,5 +1,6 @@
pub mod factory;
pub mod repositories;
pub mod ai;
pub mod db_models;
pub mod factory;
pub mod mappers;
pub mod query_builder;
pub mod query_builder;
pub mod repositories;

View File

@@ -8,7 +8,7 @@ libertas_core = { path = "../libertas_core" }
libertas_infra = { path = "../libertas_infra" }
anyhow = "1.0.100"
async-nats = "0.44.2"
async-nats = "0.45.0"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
tokio = { version = "1.48.0", features = ["full"] }

View File

@@ -59,10 +59,10 @@ async fn main() -> anyhow::Result<()> {
});
println!("Plugin context created.");
let plugin_manager = Arc::new(PluginManager::new());
let nats_client = async_nats::connect(&config.broker_url).await?;
let plugin_manager = Arc::new(PluginManager::new(nats_client.clone(), config.clone()));
println!("Connected to NATS server at {}", config.broker_url);
let mut sub_new = nats_client

View File

@@ -1,12 +1,19 @@
use std::sync::Arc;
use libertas_core::{
ai::FaceDetector,
config::{AiConfig, AppConfig, FaceDetectorRuntime},
error::{CoreError, CoreResult},
models::Media,
plugins::{MediaProcessorPlugin, PluginContext},
};
use libertas_infra::ai::{
remote_detector::RemoteNatsFaceDetector, tract_detector::TractFaceDetector,
};
use crate::plugins::{
exif_reader::ExifReaderPlugin, thumbnail::ThumbnailPlugin, xmp_writer::XmpWriterPlugin,
exif_reader::ExifReaderPlugin, face_detector::FaceDetectionPlugin, thumbnail::ThumbnailPlugin,
xmp_writer::XmpWriterPlugin,
};
pub struct PluginManager {
@@ -14,9 +21,21 @@ pub struct PluginManager {
}
impl PluginManager {
pub fn new() -> Self {
pub fn new(nats_client: async_nats::Client, config: AppConfig) -> Self {
let mut plugins: Vec<Arc<dyn MediaProcessorPlugin>> = Vec::new();
if let Some(ai_config) = &config.ai_config {
match build_face_detector(ai_config, nats_client) {
Ok(detector) => {
plugins.push(Arc::new(FaceDetectionPlugin::new(detector)));
println!("FaceDetectionPlugin loaded.");
}
Err(e) => {
eprintln!("Failed to load FaceDetectionPlugin: {}", e);
}
}
}
plugins.push(Arc::new(ExifReaderPlugin));
plugins.push(Arc::new(ThumbnailPlugin));
plugins.push(Arc::new(XmpWriterPlugin));
@@ -40,3 +59,30 @@ impl PluginManager {
println!("PluginManager finished processing media: {}", media.id);
}
}
fn build_face_detector(
config: &AiConfig,
nats_client: async_nats::Client,
) -> CoreResult<Box<dyn FaceDetector>> {
match &config.face_detector_runtime {
FaceDetectorRuntime::Tract => {
let model_path =
config
.face_detector_model_path
.as_deref()
.ok_or(CoreError::Config(
"Tract runtime needs 'face_detector_model_path'".to_string(),
))?;
Ok(Box::new(TractFaceDetector::new(model_path)?))
}
FaceDetectorRuntime::Onnx => {
unimplemented!("ONNX face detector not implemented yet");
}
FaceDetectorRuntime::RemoteNats { subject } => Ok(Box::new(RemoteNatsFaceDetector::new(
nats_client.clone(),
subject,
))),
}
}

View File

@@ -0,0 +1,73 @@
use std::path::PathBuf;
use async_trait::async_trait;
use libertas_core::{
ai::FaceDetector,
error::CoreResult,
models::{FaceRegion, Media},
plugins::{MediaProcessorPlugin, PluginContext, PluginData},
};
use tokio::fs;
pub struct FaceDetectionPlugin {
detector: Box<dyn FaceDetector>,
}
impl FaceDetectionPlugin {
pub fn new(detector: Box<dyn FaceDetector>) -> Self {
Self { detector }
}
}
#[async_trait]
impl MediaProcessorPlugin for FaceDetectionPlugin {
fn name(&self) -> &'static str {
"FaceDetectionPlugin"
}
async fn process(&self, media: &Media, context: &PluginContext) -> CoreResult<PluginData> {
let start_time = std::time::Instant::now();
if !media.mime_type.starts_with("image/") {
return Ok(PluginData {
message: "Not an image, skipping.".to_string(),
});
}
let file_path = PathBuf::from(&context.media_library_path).join(&media.storage_path);
let image_bytes = fs::read(file_path).await?;
let boxes = self.detector.detect_faces(&image_bytes).await?;
if boxes.is_empty() {
return Ok(PluginData {
message: "No faces detected.".to_string(),
});
}
let face_regions: Vec<FaceRegion> = boxes
.into_iter()
.map(|b| FaceRegion {
id: uuid::Uuid::new_v4(),
media_id: media.id,
person_id: None,
x_min: b.x_min,
y_min: b.y_min,
x_max: b.x_max,
y_max: b.y_max,
})
.collect();
context.face_region_repo.create_batch(&face_regions).await?;
let duration = start_time.elapsed();
println!("Face detection took: {:?}", duration);
Ok(PluginData {
message: format!(
"Successfully detected and saved {} faces.",
face_regions.len()
),
})
}
}

View File

@@ -1,3 +1,4 @@
pub mod exif_reader;
pub mod face_detector;
pub mod thumbnail;
pub mod xmp_writer;
pub mod thumbnail;

View File

@@ -15,6 +15,8 @@ use xmp_toolkit::{
pub struct XmpWriterPlugin;
const MWG_RS: &str = "http://www.metadataworkinggroup.com/schemas/regions/";
#[async_trait]
impl MediaProcessorPlugin for XmpWriterPlugin {
fn name(&self) -> &'static str {
@@ -57,17 +59,15 @@ impl MediaProcessorPlugin for XmpWriterPlugin {
}
if !tags.is_empty() {
xmp.set_property(DC, "subject", &XmpValue::from("[]"))
.map_err(|e| {
CoreError::Unknown(format!("Failed to create subject array in XMP: {}", e))
})?;
for tag in tags {
add_xmp_array_item(&mut xmp, DC, "subject", &tag.name)?;
}
}
write_face_regions(&mut xmp, &faces, context).await?;
if let Err(e) = write_face_regions(&mut xmp, &faces, context).await {
println!("Warning: Failed to write face regions to XMP: {}", e);
println!("Continuing without face region data.");
}
let xmp_str = xmp.to_string();
@@ -86,7 +86,9 @@ fn set_xmp_prop(xmp: &mut XmpMeta, ns: &str, key: &str, value: &str) -> CoreResu
}
fn add_xmp_array_item(xmp: &mut XmpMeta, ns: &str, key: &str, value: &str) -> CoreResult<()> {
xmp.append_array_item(ns, &XmpValue::from(key), &XmpValue::from(value))
let array_name_val = XmpValue::from(key).set_is_array(true).set_is_ordered(true);
xmp.append_array_item(ns, &array_name_val, &XmpValue::from(value))
.map_err(|e| {
CoreError::Unknown(format!(
"Failed to append item to {}:{} array in XMP: {}",
@@ -105,11 +107,13 @@ async fn write_face_regions(
return Ok(());
}
XmpMeta::register_namespace("", "mwg-rs")
XmpMeta::register_namespace(MWG_RS, "mwg-rs")
.map_err(|e| CoreError::Unknown(format!("Failed to register MWG namespace: {}", e)))?;
let regions_array_name = XmpValue::from("Regions")
.set_is_array(true)
.set_is_ordered(true);
xmp.set_property("", "mwg-rs:Regions", &XmpValue::from("[]"))
.map_err(|e| CoreError::Unknown(format!("Failed to create Regions array in XMP: {}", e)))?;
let item_struct = XmpValue::from("[]").set_is_struct(true);
for face in faces {
let mut person_name = "Unknown".to_string();
@@ -119,14 +123,18 @@ async fn write_face_regions(
}
}
let region_path = format!("Regions[last()]/mwg-rs:RegionInfo/{{ {} }}", face.id);
xmp.set_property("mwg-rs", &region_path, &XmpValue::from("[]"))
xmp.append_array_item(MWG_RS, &regions_array_name, &item_struct)
.map_err(|e| {
CoreError::Unknown(format!("Failed to create RegionInfo in XMP: {}", e))
CoreError::Unknown(format!("Failed to append Regions array item in XMP: {}", e))
})?;
let name_path = format!("{}/mwg-rs:Name", region_path);
set_xmp_prop(xmp, "mwg-rs", &name_path, &person_name)?;
let region_struct_path = "Regions[last()]";
let id_path = format!("{}/RegionId", region_struct_path);
set_xmp_prop(xmp, MWG_RS, &id_path, &face.id.to_string())?;
let name_path = format!("{}/Name", region_struct_path);
set_xmp_prop(xmp, MWG_RS, &name_path, &person_name)?;
let area_str = format!(
"{}, {}, {}, {}",
@@ -135,11 +143,11 @@ async fn write_face_regions(
face.x_max - face.x_min, // Width
face.y_max - face.y_min // Height
);
let area_path = format!("{}/mwg-rs:Area", region_path);
set_xmp_prop(xmp, "mwg-rs", &area_path, &area_str)?;
let area_path = format!("{}/Area", region_struct_path);
set_xmp_prop(xmp, MWG_RS, &area_path, &area_str)?;
let type_path = format!("{}/mwg-rs:Type", region_path);
set_xmp_prop(xmp, "mwg-rs", &type_path, "Face")?;
let type_path = format!("{}/Type", region_struct_path);
set_xmp_prop(xmp, MWG_RS, &type_path, "Face")?;
}
Ok(())