feat: Refactor media service to remove extracted EXIF data handling and update job payload with thumbnail path
This commit is contained in:
@@ -9,8 +9,8 @@ use libertas_core::{
|
|||||||
authz,
|
authz,
|
||||||
config::AppConfig,
|
config::AppConfig,
|
||||||
error::{CoreError, CoreResult},
|
error::{CoreError, CoreResult},
|
||||||
media_utils::{ExtractedExif, extract_exif_data_from_bytes, get_storage_path_and_date},
|
media_utils::{extract_exif_data_from_bytes, get_storage_path_and_date},
|
||||||
models::{Media, MediaBundle, MediaMetadata},
|
models::{Media, MediaBundle},
|
||||||
repositories::{MediaMetadataRepository, MediaRepository, UserRepository},
|
repositories::{MediaMetadataRepository, MediaRepository, UserRepository},
|
||||||
schema::{ListMediaOptions, UploadMediaData},
|
schema::{ListMediaOptions, UploadMediaData},
|
||||||
services::{AuthorizationService, MediaService},
|
services::{AuthorizationService, MediaService},
|
||||||
@@ -81,7 +81,6 @@ impl MediaService for MediaServiceImpl {
|
|||||||
storage_path_str,
|
storage_path_str,
|
||||||
hash,
|
hash,
|
||||||
file_size,
|
file_size,
|
||||||
extracted_data,
|
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -161,7 +160,8 @@ impl MediaService for MediaServiceImpl {
|
|||||||
.update_storage_used(user.id, -file_size)
|
.update_storage_used(user.id, -file_size)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let job_payload = json!({ "storage_path": media.storage_path });
|
let job_payload =
|
||||||
|
json!({ "storage_path": media.storage_path, "thumbnail_path": media.thumbnail_path });
|
||||||
self.nats_client
|
self.nats_client
|
||||||
.publish("media.deleted".to_string(), job_payload.to_string().into())
|
.publish("media.deleted".to_string(), job_payload.to_string().into())
|
||||||
.await
|
.await
|
||||||
@@ -242,7 +242,6 @@ impl MediaServiceImpl {
|
|||||||
storage_path: String,
|
storage_path: String,
|
||||||
hash: String,
|
hash: String,
|
||||||
file_size: i64,
|
file_size: i64,
|
||||||
extracted_data: ExtractedExif,
|
|
||||||
) -> CoreResult<Media> {
|
) -> CoreResult<Media> {
|
||||||
let media_model = Media {
|
let media_model = Media {
|
||||||
id: Uuid::new_v4(),
|
id: Uuid::new_v4(),
|
||||||
@@ -257,21 +256,6 @@ impl MediaServiceImpl {
|
|||||||
|
|
||||||
self.repo.create(&media_model).await?;
|
self.repo.create(&media_model).await?;
|
||||||
|
|
||||||
let mut metadata_models = Vec::new();
|
|
||||||
for (source, tag_name, tag_value) in extracted_data.all_tags {
|
|
||||||
metadata_models.push(MediaMetadata {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
media_id: media_model.id,
|
|
||||||
source,
|
|
||||||
tag_name,
|
|
||||||
tag_value,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if !metadata_models.is_empty() {
|
|
||||||
self.metadata_repo.create_batch(&metadata_models).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.user_repo
|
self.user_repo
|
||||||
.update_storage_used(owner_id, file_size)
|
.update_storage_used(owner_id, file_size)
|
||||||
.await?;
|
.await?;
|
||||||
|
|||||||
@@ -6,9 +6,16 @@ use std::{
|
|||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use libertas_core::{
|
use libertas_core::{
|
||||||
config::AppConfig, error::{CoreError, CoreResult}, media_utils::{extract_exif_data, get_storage_path_and_date}, models::{Media, MediaMetadata, User}, repositories::{MediaMetadataRepository, MediaRepository, UserRepository}
|
config::AppConfig,
|
||||||
|
error::{CoreError, CoreResult},
|
||||||
|
media_utils::{extract_exif_data, get_storage_path_and_date},
|
||||||
|
models::{Media, MediaMetadata, User},
|
||||||
|
repositories::{MediaMetadataRepository, MediaRepository, UserRepository},
|
||||||
|
};
|
||||||
|
use libertas_infra::factory::{
|
||||||
|
build_database_pool, build_media_metadata_repository, build_media_repository,
|
||||||
|
build_user_repository,
|
||||||
};
|
};
|
||||||
use libertas_infra::factory::{build_database_pool, build_media_metadata_repository, build_media_repository, build_user_repository};
|
|
||||||
use serde_json;
|
use serde_json;
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
@@ -61,7 +68,7 @@ async fn main() -> Result<()> {
|
|||||||
nats_client,
|
nats_client,
|
||||||
};
|
};
|
||||||
|
|
||||||
let user = state
|
let mut user = state
|
||||||
.user_repo
|
.user_repo
|
||||||
.find_by_username(&cli.username)
|
.find_by_username(&cli.username)
|
||||||
.await?
|
.await?
|
||||||
@@ -77,7 +84,7 @@ async fn main() -> Result<()> {
|
|||||||
if entry.file_type().is_file() {
|
if entry.file_type().is_file() {
|
||||||
let path = entry.path();
|
let path = entry.path();
|
||||||
|
|
||||||
match process_file(path, &user, &state).await {
|
match process_file(path, &mut user, &state).await {
|
||||||
Ok(media) => {
|
Ok(media) => {
|
||||||
println!("-> Imported: '{}'", media.original_filename);
|
println!("-> Imported: '{}'", media.original_filename);
|
||||||
}
|
}
|
||||||
@@ -93,7 +100,11 @@ async fn main() -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn process_file(file_path: &Path, user: &User, state: &ImporterState) -> CoreResult<Media> {
|
async fn process_file(
|
||||||
|
file_path: &Path,
|
||||||
|
user: &mut User,
|
||||||
|
state: &ImporterState,
|
||||||
|
) -> CoreResult<Media> {
|
||||||
let file_bytes = fs::read(file_path).await?;
|
let file_bytes = fs::read(file_path).await?;
|
||||||
let file_size = file_bytes.len() as i64;
|
let file_size = file_bytes.len() as i64;
|
||||||
let hash = format!("{:x}", Sha256::digest(&file_bytes));
|
let hash = format!("{:x}", Sha256::digest(&file_bytes));
|
||||||
@@ -103,10 +114,10 @@ async fn process_file(file_path: &Path, user: &User, state: &ImporterState) -> C
|
|||||||
.to_string_lossy()
|
.to_string_lossy()
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|
||||||
let user_after_check = state.user_repo.find_by_id(user.id).await?.unwrap();
|
if user.storage_used + file_size > user.storage_quota {
|
||||||
if &user_after_check.storage_used + file_size > user_after_check.storage_quota {
|
|
||||||
return Err(CoreError::Auth("Storage quota exceeded".to_string()));
|
return Err(CoreError::Auth("Storage quota exceeded".to_string()));
|
||||||
}
|
}
|
||||||
|
|
||||||
if state.media_repo.find_by_hash(&hash).await?.is_some() {
|
if state.media_repo.find_by_hash(&hash).await?.is_some() {
|
||||||
return Err(CoreError::Duplicate(
|
return Err(CoreError::Duplicate(
|
||||||
"A file with this content already exists".to_string(),
|
"A file with this content already exists".to_string(),
|
||||||
@@ -115,10 +126,7 @@ async fn process_file(file_path: &Path, user: &User, state: &ImporterState) -> C
|
|||||||
|
|
||||||
let extracted_data = match extract_exif_data(file_path).await {
|
let extracted_data = match extract_exif_data(file_path).await {
|
||||||
Ok(data) => {
|
Ok(data) => {
|
||||||
println!(
|
println!(" -> Parsed metadata: Tags={}", data.all_tags.len());
|
||||||
" -> Parsed metadata: Tags={}",
|
|
||||||
data.all_tags.len()
|
|
||||||
);
|
|
||||||
data
|
data
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -131,8 +139,7 @@ async fn process_file(file_path: &Path, user: &User, state: &ImporterState) -> C
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let (storage_path_buf, _date_taken) =
|
let (storage_path_buf, _date_taken) = get_storage_path_and_date(&extracted_data, &filename);
|
||||||
get_storage_path_and_date(&extracted_data, &filename);
|
|
||||||
|
|
||||||
let mut dest_path_buf = PathBuf::from(&state.config.media_library_path);
|
let mut dest_path_buf = PathBuf::from(&state.config.media_library_path);
|
||||||
dest_path_buf.push(&storage_path_buf);
|
dest_path_buf.push(&storage_path_buf);
|
||||||
@@ -146,7 +153,7 @@ async fn process_file(file_path: &Path, user: &User, state: &ImporterState) -> C
|
|||||||
fs::copy(file_path, &dest_path_buf).await?;
|
fs::copy(file_path, &dest_path_buf).await?;
|
||||||
|
|
||||||
let storage_path_str = storage_path_buf.to_string_lossy().to_string();
|
let storage_path_str = storage_path_buf.to_string_lossy().to_string();
|
||||||
|
|
||||||
let mime_type = mime_guess::from_path(file_path)
|
let mime_type = mime_guess::from_path(file_path)
|
||||||
.first_or_octet_stream()
|
.first_or_octet_stream()
|
||||||
.to_string();
|
.to_string();
|
||||||
@@ -175,18 +182,16 @@ async fn process_file(file_path: &Path, user: &User, state: &ImporterState) -> C
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !metadata_models.is_empty() {
|
if !metadata_models.is_empty() {
|
||||||
state
|
state.metadata_repo.create_batch(&metadata_models).await?;
|
||||||
.metadata_repo
|
|
||||||
.create_batch(&metadata_models)
|
|
||||||
.await?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
state
|
state
|
||||||
.user_repo
|
.user_repo
|
||||||
.update_storage_used(user.id, file_size)
|
.update_storage_used(user.id, file_size)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
user.storage_used += file_size;
|
||||||
|
|
||||||
let job_payload = serde_json::json!({ "media_id": media_model.id });
|
let job_payload = serde_json::json!({ "media_id": media_model.id });
|
||||||
state
|
state
|
||||||
.nats_client
|
.nats_client
|
||||||
|
|||||||
@@ -1,4 +1,7 @@
|
|||||||
use libertas_core::{error::{CoreError, CoreResult}, schema::{ListMediaOptions, SortOrder}};
|
use libertas_core::{
|
||||||
|
error::{CoreError, CoreResult},
|
||||||
|
schema::{ListMediaOptions, SortOrder},
|
||||||
|
};
|
||||||
use sqlx::QueryBuilder as SqlxQueryBuilder;
|
use sqlx::QueryBuilder as SqlxQueryBuilder;
|
||||||
|
|
||||||
pub trait QueryBuilder<T> {
|
pub trait QueryBuilder<T> {
|
||||||
@@ -50,7 +53,7 @@ impl QueryBuilder<ListMediaOptions> for MediaQueryBuilder {
|
|||||||
if !metadata_filters.is_empty() {
|
if !metadata_filters.is_empty() {
|
||||||
metadata_filter_count = metadata_filters.len();
|
metadata_filter_count = metadata_filters.len();
|
||||||
|
|
||||||
query.push(" JOIN media_metadata mm ON media.id == mm.media_id ");
|
query.push(" JOIN media_metadata mm ON media.id = mm.media_id ");
|
||||||
query.push(" AND ( ");
|
query.push(" AND ( ");
|
||||||
|
|
||||||
for (i, filter) in metadata_filters.iter().enumerate() {
|
for (i, filter) in metadata_filters.iter().enumerate() {
|
||||||
@@ -77,7 +80,7 @@ impl QueryBuilder<ListMediaOptions> for MediaQueryBuilder {
|
|||||||
|
|
||||||
if let Some(sort) = &options.sort {
|
if let Some(sort) = &options.sort {
|
||||||
let column = self.validate_sort_column(&sort.sort_by)?;
|
let column = self.validate_sort_column(&sort.sort_by)?;
|
||||||
|
|
||||||
let direction = match sort.sort_order {
|
let direction = match sort.sort_order {
|
||||||
SortOrder::Asc => "ASC",
|
SortOrder::Asc => "ASC",
|
||||||
SortOrder::Desc => "DESC",
|
SortOrder::Desc => "DESC",
|
||||||
@@ -91,9 +94,8 @@ impl QueryBuilder<ListMediaOptions> for MediaQueryBuilder {
|
|||||||
|
|
||||||
let order_by_clause = format!("ORDER BY {} {} {}", column, direction, nulls_order);
|
let order_by_clause = format!("ORDER BY {} {} {}", column, direction, nulls_order);
|
||||||
query.push(order_by_clause);
|
query.push(order_by_clause);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
query.push("ORDER BY date_taken DESC NULLS FIRST");
|
query.push(" ORDER BY media.created_at DESC NULLS LAST ");
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- 3. Apply Pagination (Future-Proofing Stub) ---
|
// --- 3. Apply Pagination (Future-Proofing Stub) ---
|
||||||
@@ -106,4 +108,4 @@ impl QueryBuilder<ListMediaOptions> for MediaQueryBuilder {
|
|||||||
|
|
||||||
Ok(query)
|
Ok(query)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,9 +22,11 @@ struct MediaJob {
|
|||||||
media_id: Uuid,
|
media_id: Uuid,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//TODO: move this to a core crate and make sure that api uses it too, this will allow us to type safely pass messages
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
struct MediaDeletedJob {
|
struct MediaDeletedJob {
|
||||||
storage_path: String,
|
storage_path: String,
|
||||||
|
thumbnail_path: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@@ -131,5 +133,12 @@ async fn process_deleted_job(
|
|||||||
println!("Failed to delete XMP sidecar: {}", e);
|
println!("Failed to delete XMP sidecar: {}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(thumbnail_path) = payload.thumbnail_path {
|
||||||
|
let thumbnail_full_path = PathBuf::from(&context.media_library_path).join(thumbnail_path);
|
||||||
|
if let Err(e) = fs::remove_file(thumbnail_full_path).await {
|
||||||
|
println!("Failed to delete thumbnail: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,9 @@ use libertas_core::{
|
|||||||
plugins::{MediaProcessorPlugin, PluginContext},
|
plugins::{MediaProcessorPlugin, PluginContext},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::plugins::{thumbnail::ThumbnailPlugin, xmp_writer::XmpWriterPlugin};
|
use crate::plugins::{
|
||||||
|
exif_reader::ExifReaderPlugin, thumbnail::ThumbnailPlugin, xmp_writer::XmpWriterPlugin,
|
||||||
|
};
|
||||||
|
|
||||||
pub struct PluginManager {
|
pub struct PluginManager {
|
||||||
plugins: Vec<Arc<dyn MediaProcessorPlugin>>,
|
plugins: Vec<Arc<dyn MediaProcessorPlugin>>,
|
||||||
@@ -15,7 +17,7 @@ impl PluginManager {
|
|||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
let mut plugins: Vec<Arc<dyn MediaProcessorPlugin>> = Vec::new();
|
let mut plugins: Vec<Arc<dyn MediaProcessorPlugin>> = Vec::new();
|
||||||
|
|
||||||
// plugins.push(Arc::new(ExifReaderPlugin)); temporarily disabled due to duplicate metadata extraction (libertas_api already does this, needs refactor)
|
plugins.push(Arc::new(ExifReaderPlugin));
|
||||||
plugins.push(Arc::new(ThumbnailPlugin));
|
plugins.push(Arc::new(ThumbnailPlugin));
|
||||||
plugins.push(Arc::new(XmpWriterPlugin));
|
plugins.push(Arc::new(XmpWriterPlugin));
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user