diff --git a/libertas_core/src/models.rs b/libertas_core/src/models.rs index 8b2c503..7f17324 100644 --- a/libertas_core/src/models.rs +++ b/libertas_core/src/models.rs @@ -49,6 +49,7 @@ impl From<&str> for MediaMetadataSource { } } +#[derive(Clone)] pub struct Media { pub id: uuid::Uuid, pub owner_id: uuid::Uuid, @@ -148,7 +149,6 @@ pub struct AlbumShare { pub permission: AlbumPermission, } - pub struct MediaBundle { pub media: Media, pub metadata: Vec, @@ -188,4 +188,4 @@ pub struct PersonShare { pub person_id: uuid::Uuid, pub user_id: uuid::Uuid, pub permission: PersonPermission, -} \ No newline at end of file +} diff --git a/libertas_core/src/repositories.rs b/libertas_core/src/repositories.rs index d58d358..07d986c 100644 --- a/libertas_core/src/repositories.rs +++ b/libertas_core/src/repositories.rs @@ -7,7 +7,7 @@ use crate::{ Album, AlbumPermission, FaceRegion, Media, MediaMetadata, Person, PersonPermission, Tag, User, }, - schema::ListMediaOptions, + schema::{ListMediaOptions, MediaImportBundle}, }; #[async_trait] @@ -121,3 +121,8 @@ pub trait PersonShareRepository: Send + Sync { user_id: Uuid, ) -> CoreResult>; } + +#[async_trait] +pub trait MediaImportRepository: Send + Sync { + async fn create_media_bundle(&self, bundle: MediaImportBundle) -> CoreResult<()>; +} diff --git a/libertas_core/src/schema.rs b/libertas_core/src/schema.rs index 84cc9ab..6ed3a2a 100644 --- a/libertas_core/src/schema.rs +++ b/libertas_core/src/schema.rs @@ -1,4 +1,4 @@ -use crate::models::AlbumPermission; +use crate::models::{AlbumPermission, Media, MediaMetadata}; pub struct UploadMediaData<'a> { pub owner_id: uuid::Uuid, @@ -74,4 +74,10 @@ pub struct ListMediaOptions { pub struct MetadataFilter { pub tag_name: String, pub tag_value: String, -} \ No newline at end of file +} + +pub struct MediaImportBundle { + pub media_model: Media, + pub metadata_models: Vec, + pub file_size: i64, +} diff --git a/libertas_importer/src/main.rs b/libertas_importer/src/main.rs index 2ed818e..7aa2d64 100644 --- a/libertas_importer/src/main.rs +++ b/libertas_importer/src/main.rs @@ -10,10 +10,11 @@ use libertas_core::{ error::{CoreError, CoreResult}, media_utils::{extract_exif_data, get_storage_path_and_date}, models::{Media, MediaMetadata, User}, - repositories::{MediaMetadataRepository, MediaRepository, UserRepository}, + repositories::{MediaImportRepository, MediaRepository, UserRepository}, + schema::MediaImportBundle, }; use libertas_infra::factory::{ - build_database_pool, build_media_metadata_repository, build_media_repository, + build_database_pool, build_media_import_repository, build_media_repository, build_user_repository, }; use serde_json; @@ -39,7 +40,7 @@ struct ImporterState { config: AppConfig, media_repo: Arc, user_repo: Arc, - metadata_repo: Arc, + media_import_repo: Arc, nats_client: async_nats::Client, } @@ -55,7 +56,8 @@ async fn main() -> Result<()> { let db_pool = build_database_pool(&config.database).await?; let media_repo = build_media_repository(&config, db_pool.clone()).await?; let user_repo = build_user_repository(&config.database, db_pool.clone()).await?; - let metadata_repo = build_media_metadata_repository(&config.database, db_pool.clone()).await?; + let media_import_repo = + build_media_import_repository(&config.database, db_pool.clone()).await?; let nats_client = async_nats::connect(&config.broker_url).await?; println!("Connected to database and NATS broker."); @@ -64,7 +66,7 @@ async fn main() -> Result<()> { config, media_repo, user_repo, - metadata_repo, + media_import_repo, nats_client, }; @@ -168,7 +170,6 @@ async fn process_file( created_at: chrono::Utc::now(), thumbnail_path: None, }; - state.media_repo.create(&media_model).await?; let mut metadata_models = Vec::new(); for (source, tag_name, tag_value) in extracted_data.all_tags { @@ -181,14 +182,13 @@ async fn process_file( }); } - if !metadata_models.is_empty() { - state.metadata_repo.create_batch(&metadata_models).await?; - } + let bundle = MediaImportBundle { + media_model: media_model.clone(), + metadata_models, + file_size, + }; - state - .user_repo - .update_storage_used(user.id, file_size) - .await?; + state.media_import_repo.create_media_bundle(bundle).await?; user.storage_used += file_size; diff --git a/libertas_infra/src/factory.rs b/libertas_infra/src/factory.rs index 2ea948e..c2e413a 100644 --- a/libertas_infra/src/factory.rs +++ b/libertas_infra/src/factory.rs @@ -152,10 +152,28 @@ pub async fn build_person_share_repository( ) -> CoreResult> { match pool { DatabasePool::Postgres(pg_pool) => Ok(Arc::new( - crate::repositories::person_share_repository::PostgresPersonShareRepository::new(pg_pool), + crate::repositories::person_share_repository::PostgresPersonShareRepository::new( + pg_pool, + ), )), DatabasePool::Sqlite(_sqlite_pool) => Err(CoreError::Database( "Sqlite person share repository not implemented".to_string(), )), } -} \ No newline at end of file +} + +pub async fn build_media_import_repository( + _db_config: &DatabaseConfig, + pool: DatabasePool, +) -> CoreResult> { + match pool { + DatabasePool::Postgres(pg_pool) => Ok(Arc::new( + crate::repositories::media_import_repository::PostgresMediaImportRepository::new( + pg_pool, + ), + )), + DatabasePool::Sqlite(_sqlite_pool) => Err(CoreError::Database( + "Sqlite media import repository not implemented".to_string(), + )), + } +} diff --git a/libertas_infra/src/repositories/media_import_repository.rs b/libertas_infra/src/repositories/media_import_repository.rs new file mode 100644 index 0000000..07dba65 --- /dev/null +++ b/libertas_infra/src/repositories/media_import_repository.rs @@ -0,0 +1,57 @@ +use async_trait::async_trait; +use libertas_core::{ + error::{CoreError, CoreResult}, + repositories::MediaImportRepository, + schema::MediaImportBundle, +}; +use sqlx::PgPool; + +use crate::repositories::{ + media_metadata_repository::PostgresMediaMetadataRepository, + media_repository::PostgresMediaRepository, user_repository::PostgresUserRepository, +}; + +#[derive(Clone)] +pub struct PostgresMediaImportRepository { + pool: PgPool, +} + +impl PostgresMediaImportRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl MediaImportRepository for PostgresMediaImportRepository { + async fn create_media_bundle(&self, bundle: MediaImportBundle) -> CoreResult<()> { + let mut tx = self + .pool + .begin() + .await + .map_err(|e| CoreError::Database(e.to_string()))?; + + PostgresMediaRepository::create_internal(&mut *tx, &bundle.media_model).await?; + + if !bundle.metadata_models.is_empty() { + PostgresMediaMetadataRepository::create_batch_internal( + &mut *tx, + &bundle.metadata_models, + ) + .await?; + } + + PostgresUserRepository::update_storage_used_internal( + &mut *tx, + bundle.media_model.owner_id, + bundle.file_size as i64, + ) + .await?; + + tx.commit() + .await + .map_err(|e| CoreError::Database(e.to_string()))?; + + Ok(()) + } +} diff --git a/libertas_infra/src/repositories/media_metadata_repository.rs b/libertas_infra/src/repositories/media_metadata_repository.rs index f3b9d7f..7dbc24a 100644 --- a/libertas_infra/src/repositories/media_metadata_repository.rs +++ b/libertas_infra/src/repositories/media_metadata_repository.rs @@ -1,5 +1,9 @@ use async_trait::async_trait; -use libertas_core::{error::{CoreError, CoreResult}, models::MediaMetadata, repositories::MediaMetadataRepository}; +use libertas_core::{ + error::{CoreError, CoreResult}, + models::MediaMetadata, + repositories::MediaMetadataRepository, +}; use sqlx::PgPool; use crate::db_models::{PostgresMediaMetadata, PostgresMediaMetadataSource}; @@ -12,11 +16,11 @@ impl PostgresMediaMetadataRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } -} -#[async_trait] -impl MediaMetadataRepository for PostgresMediaMetadataRepository { - async fn create_batch(&self, metadata: &[MediaMetadata]) -> CoreResult<()> { + pub(crate) async fn create_batch_internal<'a>( + exec: impl sqlx::Executor<'a, Database = sqlx::Postgres>, + metadata: &[MediaMetadata], + ) -> CoreResult<()> { if metadata.is_empty() { return Ok(()); } @@ -52,12 +56,19 @@ impl MediaMetadataRepository for PostgresMediaMetadataRepository { &tag_names, &tag_values, ) - .execute(&self.pool) + .execute(exec) .await .map_err(|e| CoreError::Database(e.to_string()))?; Ok(()) } +} + +#[async_trait] +impl MediaMetadataRepository for PostgresMediaMetadataRepository { + async fn create_batch(&self, metadata: &[MediaMetadata]) -> CoreResult<()> { + Self::create_batch_internal(&self.pool, metadata).await + } async fn find_by_media_id(&self, media_id: uuid::Uuid) -> CoreResult> { let pg_metadata = sqlx::query_as!( @@ -76,4 +87,4 @@ impl MediaMetadataRepository for PostgresMediaMetadataRepository { let metadata = pg_metadata.into_iter().map(|m| m.into()).collect(); Ok(metadata) } -} \ No newline at end of file +} diff --git a/libertas_infra/src/repositories/media_repository.rs b/libertas_infra/src/repositories/media_repository.rs index b1710a8..b8e4c9e 100644 --- a/libertas_infra/src/repositories/media_repository.rs +++ b/libertas_infra/src/repositories/media_repository.rs @@ -2,12 +2,19 @@ use std::sync::Arc; use async_trait::async_trait; use libertas_core::{ - config::AppConfig, error::{CoreError, CoreResult}, models::Media, repositories::MediaRepository, schema::ListMediaOptions + config::AppConfig, + error::{CoreError, CoreResult}, + models::Media, + repositories::MediaRepository, + schema::ListMediaOptions, }; use sqlx::PgPool; use uuid::Uuid; -use crate::{db_models::PostgresMedia, query_builder::{MediaQueryBuilder, QueryBuilder}}; +use crate::{ + db_models::PostgresMedia, + query_builder::{MediaQueryBuilder, QueryBuilder}, +}; #[derive(Clone)] pub struct PostgresMediaRepository { @@ -22,13 +29,16 @@ impl PostgresMediaRepository { .clone() .unwrap_or_else(|| vec!["created_at".to_string(), "original_filename".to_string()]); - Self { pool, query_builder: Arc::new(MediaQueryBuilder::new(allowed_columns)) } + Self { + pool, + query_builder: Arc::new(MediaQueryBuilder::new(allowed_columns)), + } } -} -#[async_trait] -impl MediaRepository for PostgresMediaRepository { - async fn create(&self, media: &Media) -> CoreResult<()> { + pub(crate) async fn create_internal<'a>( + exec: impl sqlx::Executor<'a, Database = sqlx::Postgres>, + media: &Media, + ) -> CoreResult<()> { sqlx::query!( r#" INSERT INTO media (id, owner_id, storage_path, original_filename, mime_type, hash, created_at, thumbnail_path) @@ -43,12 +53,18 @@ impl MediaRepository for PostgresMediaRepository { media.created_at, media.thumbnail_path ) - .execute(&self.pool) + .execute(exec) .await .map_err(|e| CoreError::Database(e.to_string()))?; - Ok(()) } +} + +#[async_trait] +impl MediaRepository for PostgresMediaRepository { + async fn create(&self, media: &Media) -> CoreResult<()> { + Self::create_internal(&self.pool, media).await + } async fn find_by_hash(&self, hash: &str) -> CoreResult> { let pg_media = sqlx::query_as!( @@ -86,7 +102,11 @@ impl MediaRepository for PostgresMediaRepository { Ok(pg_media.map(|m| m.into())) } - async fn list_by_user(&self, user_id: Uuid, options: &ListMediaOptions) -> CoreResult> { + async fn list_by_user( + &self, + user_id: Uuid, + options: &ListMediaOptions, + ) -> CoreResult> { let mut query = sqlx::QueryBuilder::new( r#" SELECT media.id, media.owner_id, media.storage_path, media.original_filename, media.mime_type, media.hash, media.created_at, @@ -98,16 +118,14 @@ impl MediaRepository for PostgresMediaRepository { query.push_bind(user_id); - query = self - .query_builder - .apply_options_to_query(query, options)?; + query = self.query_builder.apply_options_to_query(query, options)?; let pg_media = query - .build_query_as::() - .fetch_all(&self.pool) - .await - .map_err(|e| CoreError::Database(e.to_string()))?; - + .build_query_as::() + .fetch_all(&self.pool) + .await + .map_err(|e| CoreError::Database(e.to_string()))?; + let media_list = pg_media.into_iter().map(|m| m.into()).collect(); Ok(media_list) } diff --git a/libertas_infra/src/repositories/mod.rs b/libertas_infra/src/repositories/mod.rs index f5d9d26..83df769 100644 --- a/libertas_infra/src/repositories/mod.rs +++ b/libertas_infra/src/repositories/mod.rs @@ -1,9 +1,10 @@ pub mod album_repository; pub mod album_share_repository; -pub mod media_repository; -pub mod user_repository; -pub mod media_metadata_repository; -pub mod tag_repository; -pub mod person_repository; pub mod face_region_repository; -pub mod person_share_repository; \ No newline at end of file +pub mod media_import_repository; +pub mod media_metadata_repository; +pub mod media_repository; +pub mod person_repository; +pub mod person_share_repository; +pub mod tag_repository; +pub mod user_repository; diff --git a/libertas_infra/src/repositories/user_repository.rs b/libertas_infra/src/repositories/user_repository.rs index 069ddf4..6d734be 100644 --- a/libertas_infra/src/repositories/user_repository.rs +++ b/libertas_infra/src/repositories/user_repository.rs @@ -4,7 +4,7 @@ use libertas_core::{ models::User, repositories::UserRepository, }; -use sqlx::{PgPool, SqlitePool, types::Uuid}; +use sqlx::{Executor, PgPool, Postgres, SqlitePool, types::Uuid}; use crate::db_models::PostgresUser; @@ -17,6 +17,27 @@ impl PostgresUserRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } + + pub(crate) async fn update_storage_used_internal<'a>( + exec: impl Executor<'a, Database = Postgres>, + user_id: Uuid, + bytes: i64, + ) -> CoreResult<()> { + sqlx::query!( + r#" + UPDATE users + SET storage_used = storage_used + $1, updated_at = NOW() + WHERE id = $2 + "#, + bytes, + user_id + ) + .execute(exec) + .await + .map_err(|e| CoreError::Database(e.to_string()))?; + + Ok(()) + } } #[derive(Clone)] @@ -114,20 +135,7 @@ impl UserRepository for PostgresUserRepository { } async fn update_storage_used(&self, user_id: Uuid, bytes: i64) -> CoreResult<()> { - sqlx::query!( - r#" - UPDATE users - SET storage_used = storage_used + $1, updated_at = NOW() - WHERE id = $2 - "#, - bytes, - user_id - ) - .execute(&self.pool) - .await - .map_err(|e| CoreError::Database(e.to_string()))?; - - Ok(()) + Self::update_storage_used_internal(&self.pool, user_id, bytes).await } }