feat: Implement media import functionality with repository and bundle support

This commit is contained in:
2025-11-15 15:24:52 +01:00
parent faed54cb08
commit d7b22bdcb1
10 changed files with 190 additions and 66 deletions

View File

@@ -49,6 +49,7 @@ impl From<&str> for MediaMetadataSource {
} }
} }
#[derive(Clone)]
pub struct Media { pub struct Media {
pub id: uuid::Uuid, pub id: uuid::Uuid,
pub owner_id: uuid::Uuid, pub owner_id: uuid::Uuid,
@@ -148,7 +149,6 @@ pub struct AlbumShare {
pub permission: AlbumPermission, pub permission: AlbumPermission,
} }
pub struct MediaBundle { pub struct MediaBundle {
pub media: Media, pub media: Media,
pub metadata: Vec<MediaMetadata>, pub metadata: Vec<MediaMetadata>,

View File

@@ -7,7 +7,7 @@ use crate::{
Album, AlbumPermission, FaceRegion, Media, MediaMetadata, Person, PersonPermission, Tag, Album, AlbumPermission, FaceRegion, Media, MediaMetadata, Person, PersonPermission, Tag,
User, User,
}, },
schema::ListMediaOptions, schema::{ListMediaOptions, MediaImportBundle},
}; };
#[async_trait] #[async_trait]
@@ -121,3 +121,8 @@ pub trait PersonShareRepository: Send + Sync {
user_id: Uuid, user_id: Uuid,
) -> CoreResult<Vec<(Person, PersonPermission)>>; ) -> CoreResult<Vec<(Person, PersonPermission)>>;
} }
#[async_trait]
pub trait MediaImportRepository: Send + Sync {
async fn create_media_bundle(&self, bundle: MediaImportBundle) -> CoreResult<()>;
}

View File

@@ -1,4 +1,4 @@
use crate::models::AlbumPermission; use crate::models::{AlbumPermission, Media, MediaMetadata};
pub struct UploadMediaData<'a> { pub struct UploadMediaData<'a> {
pub owner_id: uuid::Uuid, pub owner_id: uuid::Uuid,
@@ -75,3 +75,9 @@ pub struct MetadataFilter {
pub tag_name: String, pub tag_name: String,
pub tag_value: String, pub tag_value: String,
} }
pub struct MediaImportBundle {
pub media_model: Media,
pub metadata_models: Vec<MediaMetadata>,
pub file_size: i64,
}

View File

@@ -10,10 +10,11 @@ use libertas_core::{
error::{CoreError, CoreResult}, error::{CoreError, CoreResult},
media_utils::{extract_exif_data, get_storage_path_and_date}, media_utils::{extract_exif_data, get_storage_path_and_date},
models::{Media, MediaMetadata, User}, models::{Media, MediaMetadata, User},
repositories::{MediaMetadataRepository, MediaRepository, UserRepository}, repositories::{MediaImportRepository, MediaRepository, UserRepository},
schema::MediaImportBundle,
}; };
use libertas_infra::factory::{ use libertas_infra::factory::{
build_database_pool, build_media_metadata_repository, build_media_repository, build_database_pool, build_media_import_repository, build_media_repository,
build_user_repository, build_user_repository,
}; };
use serde_json; use serde_json;
@@ -39,7 +40,7 @@ struct ImporterState {
config: AppConfig, config: AppConfig,
media_repo: Arc<dyn MediaRepository>, media_repo: Arc<dyn MediaRepository>,
user_repo: Arc<dyn UserRepository>, user_repo: Arc<dyn UserRepository>,
metadata_repo: Arc<dyn MediaMetadataRepository>, media_import_repo: Arc<dyn MediaImportRepository>,
nats_client: async_nats::Client, nats_client: async_nats::Client,
} }
@@ -55,7 +56,8 @@ async fn main() -> Result<()> {
let db_pool = build_database_pool(&config.database).await?; let db_pool = build_database_pool(&config.database).await?;
let media_repo = build_media_repository(&config, db_pool.clone()).await?; let media_repo = build_media_repository(&config, db_pool.clone()).await?;
let user_repo = build_user_repository(&config.database, db_pool.clone()).await?; let user_repo = build_user_repository(&config.database, db_pool.clone()).await?;
let metadata_repo = build_media_metadata_repository(&config.database, db_pool.clone()).await?; let media_import_repo =
build_media_import_repository(&config.database, db_pool.clone()).await?;
let nats_client = async_nats::connect(&config.broker_url).await?; let nats_client = async_nats::connect(&config.broker_url).await?;
println!("Connected to database and NATS broker."); println!("Connected to database and NATS broker.");
@@ -64,7 +66,7 @@ async fn main() -> Result<()> {
config, config,
media_repo, media_repo,
user_repo, user_repo,
metadata_repo, media_import_repo,
nats_client, nats_client,
}; };
@@ -168,7 +170,6 @@ async fn process_file(
created_at: chrono::Utc::now(), created_at: chrono::Utc::now(),
thumbnail_path: None, thumbnail_path: None,
}; };
state.media_repo.create(&media_model).await?;
let mut metadata_models = Vec::new(); let mut metadata_models = Vec::new();
for (source, tag_name, tag_value) in extracted_data.all_tags { for (source, tag_name, tag_value) in extracted_data.all_tags {
@@ -181,14 +182,13 @@ async fn process_file(
}); });
} }
if !metadata_models.is_empty() { let bundle = MediaImportBundle {
state.metadata_repo.create_batch(&metadata_models).await?; media_model: media_model.clone(),
} metadata_models,
file_size,
};
state state.media_import_repo.create_media_bundle(bundle).await?;
.user_repo
.update_storage_used(user.id, file_size)
.await?;
user.storage_used += file_size; user.storage_used += file_size;

View File

@@ -152,10 +152,28 @@ pub async fn build_person_share_repository(
) -> CoreResult<Arc<dyn libertas_core::repositories::PersonShareRepository>> { ) -> CoreResult<Arc<dyn libertas_core::repositories::PersonShareRepository>> {
match pool { match pool {
DatabasePool::Postgres(pg_pool) => Ok(Arc::new( DatabasePool::Postgres(pg_pool) => Ok(Arc::new(
crate::repositories::person_share_repository::PostgresPersonShareRepository::new(pg_pool), crate::repositories::person_share_repository::PostgresPersonShareRepository::new(
pg_pool,
),
)), )),
DatabasePool::Sqlite(_sqlite_pool) => Err(CoreError::Database( DatabasePool::Sqlite(_sqlite_pool) => Err(CoreError::Database(
"Sqlite person share repository not implemented".to_string(), "Sqlite person share repository not implemented".to_string(),
)), )),
} }
} }
pub async fn build_media_import_repository(
_db_config: &DatabaseConfig,
pool: DatabasePool,
) -> CoreResult<Arc<dyn libertas_core::repositories::MediaImportRepository>> {
match pool {
DatabasePool::Postgres(pg_pool) => Ok(Arc::new(
crate::repositories::media_import_repository::PostgresMediaImportRepository::new(
pg_pool,
),
)),
DatabasePool::Sqlite(_sqlite_pool) => Err(CoreError::Database(
"Sqlite media import repository not implemented".to_string(),
)),
}
}

View File

@@ -0,0 +1,57 @@
use async_trait::async_trait;
use libertas_core::{
error::{CoreError, CoreResult},
repositories::MediaImportRepository,
schema::MediaImportBundle,
};
use sqlx::PgPool;
use crate::repositories::{
media_metadata_repository::PostgresMediaMetadataRepository,
media_repository::PostgresMediaRepository, user_repository::PostgresUserRepository,
};
#[derive(Clone)]
pub struct PostgresMediaImportRepository {
pool: PgPool,
}
impl PostgresMediaImportRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait]
impl MediaImportRepository for PostgresMediaImportRepository {
async fn create_media_bundle(&self, bundle: MediaImportBundle) -> CoreResult<()> {
let mut tx = self
.pool
.begin()
.await
.map_err(|e| CoreError::Database(e.to_string()))?;
PostgresMediaRepository::create_internal(&mut *tx, &bundle.media_model).await?;
if !bundle.metadata_models.is_empty() {
PostgresMediaMetadataRepository::create_batch_internal(
&mut *tx,
&bundle.metadata_models,
)
.await?;
}
PostgresUserRepository::update_storage_used_internal(
&mut *tx,
bundle.media_model.owner_id,
bundle.file_size as i64,
)
.await?;
tx.commit()
.await
.map_err(|e| CoreError::Database(e.to_string()))?;
Ok(())
}
}

View File

@@ -1,5 +1,9 @@
use async_trait::async_trait; use async_trait::async_trait;
use libertas_core::{error::{CoreError, CoreResult}, models::MediaMetadata, repositories::MediaMetadataRepository}; use libertas_core::{
error::{CoreError, CoreResult},
models::MediaMetadata,
repositories::MediaMetadataRepository,
};
use sqlx::PgPool; use sqlx::PgPool;
use crate::db_models::{PostgresMediaMetadata, PostgresMediaMetadataSource}; use crate::db_models::{PostgresMediaMetadata, PostgresMediaMetadataSource};
@@ -12,11 +16,11 @@ impl PostgresMediaMetadataRepository {
pub fn new(pool: PgPool) -> Self { pub fn new(pool: PgPool) -> Self {
Self { pool } Self { pool }
} }
}
#[async_trait] pub(crate) async fn create_batch_internal<'a>(
impl MediaMetadataRepository for PostgresMediaMetadataRepository { exec: impl sqlx::Executor<'a, Database = sqlx::Postgres>,
async fn create_batch(&self, metadata: &[MediaMetadata]) -> CoreResult<()> { metadata: &[MediaMetadata],
) -> CoreResult<()> {
if metadata.is_empty() { if metadata.is_empty() {
return Ok(()); return Ok(());
} }
@@ -52,12 +56,19 @@ impl MediaMetadataRepository for PostgresMediaMetadataRepository {
&tag_names, &tag_names,
&tag_values, &tag_values,
) )
.execute(&self.pool) .execute(exec)
.await .await
.map_err(|e| CoreError::Database(e.to_string()))?; .map_err(|e| CoreError::Database(e.to_string()))?;
Ok(()) Ok(())
} }
}
#[async_trait]
impl MediaMetadataRepository for PostgresMediaMetadataRepository {
async fn create_batch(&self, metadata: &[MediaMetadata]) -> CoreResult<()> {
Self::create_batch_internal(&self.pool, metadata).await
}
async fn find_by_media_id(&self, media_id: uuid::Uuid) -> CoreResult<Vec<MediaMetadata>> { async fn find_by_media_id(&self, media_id: uuid::Uuid) -> CoreResult<Vec<MediaMetadata>> {
let pg_metadata = sqlx::query_as!( let pg_metadata = sqlx::query_as!(

View File

@@ -2,12 +2,19 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use libertas_core::{ use libertas_core::{
config::AppConfig, error::{CoreError, CoreResult}, models::Media, repositories::MediaRepository, schema::ListMediaOptions config::AppConfig,
error::{CoreError, CoreResult},
models::Media,
repositories::MediaRepository,
schema::ListMediaOptions,
}; };
use sqlx::PgPool; use sqlx::PgPool;
use uuid::Uuid; use uuid::Uuid;
use crate::{db_models::PostgresMedia, query_builder::{MediaQueryBuilder, QueryBuilder}}; use crate::{
db_models::PostgresMedia,
query_builder::{MediaQueryBuilder, QueryBuilder},
};
#[derive(Clone)] #[derive(Clone)]
pub struct PostgresMediaRepository { pub struct PostgresMediaRepository {
@@ -22,13 +29,16 @@ impl PostgresMediaRepository {
.clone() .clone()
.unwrap_or_else(|| vec!["created_at".to_string(), "original_filename".to_string()]); .unwrap_or_else(|| vec!["created_at".to_string(), "original_filename".to_string()]);
Self { pool, query_builder: Arc::new(MediaQueryBuilder::new(allowed_columns)) } Self {
pool,
query_builder: Arc::new(MediaQueryBuilder::new(allowed_columns)),
} }
} }
#[async_trait] pub(crate) async fn create_internal<'a>(
impl MediaRepository for PostgresMediaRepository { exec: impl sqlx::Executor<'a, Database = sqlx::Postgres>,
async fn create(&self, media: &Media) -> CoreResult<()> { media: &Media,
) -> CoreResult<()> {
sqlx::query!( sqlx::query!(
r#" r#"
INSERT INTO media (id, owner_id, storage_path, original_filename, mime_type, hash, created_at, thumbnail_path) INSERT INTO media (id, owner_id, storage_path, original_filename, mime_type, hash, created_at, thumbnail_path)
@@ -43,12 +53,18 @@ impl MediaRepository for PostgresMediaRepository {
media.created_at, media.created_at,
media.thumbnail_path media.thumbnail_path
) )
.execute(&self.pool) .execute(exec)
.await .await
.map_err(|e| CoreError::Database(e.to_string()))?; .map_err(|e| CoreError::Database(e.to_string()))?;
Ok(()) Ok(())
} }
}
#[async_trait]
impl MediaRepository for PostgresMediaRepository {
async fn create(&self, media: &Media) -> CoreResult<()> {
Self::create_internal(&self.pool, media).await
}
async fn find_by_hash(&self, hash: &str) -> CoreResult<Option<Media>> { async fn find_by_hash(&self, hash: &str) -> CoreResult<Option<Media>> {
let pg_media = sqlx::query_as!( let pg_media = sqlx::query_as!(
@@ -86,7 +102,11 @@ impl MediaRepository for PostgresMediaRepository {
Ok(pg_media.map(|m| m.into())) Ok(pg_media.map(|m| m.into()))
} }
async fn list_by_user(&self, user_id: Uuid, options: &ListMediaOptions) -> CoreResult<Vec<Media>> { async fn list_by_user(
&self,
user_id: Uuid,
options: &ListMediaOptions,
) -> CoreResult<Vec<Media>> {
let mut query = sqlx::QueryBuilder::new( let mut query = sqlx::QueryBuilder::new(
r#" r#"
SELECT media.id, media.owner_id, media.storage_path, media.original_filename, media.mime_type, media.hash, media.created_at, SELECT media.id, media.owner_id, media.storage_path, media.original_filename, media.mime_type, media.hash, media.created_at,
@@ -98,9 +118,7 @@ impl MediaRepository for PostgresMediaRepository {
query.push_bind(user_id); query.push_bind(user_id);
query = self query = self.query_builder.apply_options_to_query(query, options)?;
.query_builder
.apply_options_to_query(query, options)?;
let pg_media = query let pg_media = query
.build_query_as::<PostgresMedia>() .build_query_as::<PostgresMedia>()

View File

@@ -1,9 +1,10 @@
pub mod album_repository; pub mod album_repository;
pub mod album_share_repository; pub mod album_share_repository;
pub mod media_repository;
pub mod user_repository;
pub mod media_metadata_repository;
pub mod tag_repository;
pub mod person_repository;
pub mod face_region_repository; pub mod face_region_repository;
pub mod media_import_repository;
pub mod media_metadata_repository;
pub mod media_repository;
pub mod person_repository;
pub mod person_share_repository; pub mod person_share_repository;
pub mod tag_repository;
pub mod user_repository;

View File

@@ -4,7 +4,7 @@ use libertas_core::{
models::User, models::User,
repositories::UserRepository, repositories::UserRepository,
}; };
use sqlx::{PgPool, SqlitePool, types::Uuid}; use sqlx::{Executor, PgPool, Postgres, SqlitePool, types::Uuid};
use crate::db_models::PostgresUser; use crate::db_models::PostgresUser;
@@ -17,6 +17,27 @@ impl PostgresUserRepository {
pub fn new(pool: PgPool) -> Self { pub fn new(pool: PgPool) -> Self {
Self { pool } Self { pool }
} }
pub(crate) async fn update_storage_used_internal<'a>(
exec: impl Executor<'a, Database = Postgres>,
user_id: Uuid,
bytes: i64,
) -> CoreResult<()> {
sqlx::query!(
r#"
UPDATE users
SET storage_used = storage_used + $1, updated_at = NOW()
WHERE id = $2
"#,
bytes,
user_id
)
.execute(exec)
.await
.map_err(|e| CoreError::Database(e.to_string()))?;
Ok(())
}
} }
#[derive(Clone)] #[derive(Clone)]
@@ -114,20 +135,7 @@ impl UserRepository for PostgresUserRepository {
} }
async fn update_storage_used(&self, user_id: Uuid, bytes: i64) -> CoreResult<()> { async fn update_storage_used(&self, user_id: Uuid, bytes: i64) -> CoreResult<()> {
sqlx::query!( Self::update_storage_used_internal(&self.pool, user_id, bytes).await
r#"
UPDATE users
SET storage_used = storage_used + $1, updated_at = NOW()
WHERE id = $2
"#,
bytes,
user_id
)
.execute(&self.pool)
.await
.map_err(|e| CoreError::Database(e.to_string()))?;
Ok(())
} }
} }