use std::sync::Arc; use async_trait::async_trait; use libertas_core::{ config::AppConfig, error::{CoreError, CoreResult}, models::Media, repositories::MediaRepository, schema::ListMediaOptions, }; use sqlx::PgPool; use uuid::Uuid; use crate::{db_models::PostgresMedia, query_builder::MediaQueryBuilder}; #[derive(Clone)] pub struct PostgresMediaRepository { pool: PgPool, query_builder: Arc, } impl PostgresMediaRepository { pub fn new(pool: PgPool, config: &AppConfig) -> Self { let allowed_columns = config .allowed_sort_columns .clone() .unwrap_or_else(|| vec!["created_at".to_string(), "original_filename".to_string()]); Self { pool, query_builder: Arc::new(MediaQueryBuilder::new(allowed_columns)), } } pub(crate) async fn create_internal<'a>( exec: impl sqlx::Executor<'a, Database = sqlx::Postgres>, media: &Media, ) -> CoreResult<()> { sqlx::query!( r#" INSERT INTO media (id, owner_id, storage_path, original_filename, mime_type, hash, created_at, thumbnail_path, date_taken) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) "#, media.id, media.owner_id, media.storage_path, media.original_filename, media.mime_type, media.hash, media.created_at, media.thumbnail_path, media.date_taken ) .execute(exec) .await .map_err(|e| CoreError::Database(e.to_string()))?; Ok(()) } } #[async_trait] impl MediaRepository for PostgresMediaRepository { async fn create(&self, media: &Media) -> CoreResult<()> { Self::create_internal(&self.pool, media).await } async fn find_by_hash(&self, hash: &str) -> CoreResult> { let pg_media = sqlx::query_as!( PostgresMedia, r#" SELECT id, owner_id, storage_path, original_filename, mime_type, hash, created_at, thumbnail_path, date_taken FROM media WHERE hash = $1 "#, hash ) .fetch_optional(&self.pool) .await .map_err(|e| CoreError::Database(e.to_string()))?; Ok(pg_media.map(|m| m.into())) } async fn find_by_id(&self, id: Uuid) -> CoreResult> { let pg_media = sqlx::query_as!( PostgresMedia, r#" SELECT id, owner_id, storage_path, original_filename, mime_type, hash, created_at, thumbnail_path, date_taken FROM media WHERE id = $1 "#, id ) .fetch_optional(&self.pool) .await .map_err(|e| CoreError::Database(e.to_string()))?; Ok(pg_media.map(|m| m.into())) } async fn list_by_user( &self, user_id: Uuid, options: &ListMediaOptions, ) -> CoreResult<(Vec, i64)> { let count_base_sql = "SELECT COUNT(DISTINCT media.id) as total FROM media"; let mut count_query = sqlx::QueryBuilder::new(count_base_sql); count_query.push(" WHERE media.owner_id = "); count_query.push_bind(user_id); let (mut count_query, metadata_filter_count) = self .query_builder .apply_filters_to_query(count_query, options)?; if metadata_filter_count > 0 { count_query.push(" GROUP BY media.id "); count_query.push(" HAVING COUNT(DISTINCT mm.tag_name) = "); count_query.push_bind(metadata_filter_count); let mut final_count_query = sqlx::QueryBuilder::new("SELECT COUNT(*) as total FROM ("); final_count_query.push(count_query.into_sql()); final_count_query.push(") as subquery"); count_query = final_count_query; } let total_items_result = count_query .build_query_scalar() .fetch_one(&self.pool) .await .map_err(|e| CoreError::Database(e.to_string()))?; let data_base_sql = "SELECT media.id, media.owner_id, media.storage_path, media.original_filename, media.mime_type, media.hash, media.created_at, media.thumbnail_path, media.date_taken FROM media"; let mut data_query = sqlx::QueryBuilder::new(data_base_sql); data_query.push(" WHERE media.owner_id = "); data_query.push_bind(user_id); let (mut data_query, metadata_filter_count) = self .query_builder .apply_filters_to_query(data_query, options)?; if metadata_filter_count > 0 { data_query.push(" GROUP BY media.id "); data_query.push(" HAVING COUNT(DISTINCT mm.tag_name) = "); data_query.push_bind(metadata_filter_count); } data_query = self .query_builder .apply_sorting_to_query(data_query, options)?; data_query = self .query_builder .apply_pagination_to_query(data_query, options)?; let pg_media = data_query .build_query_as::() .fetch_all(&self.pool) .await .map_err(|e| CoreError::Database(e.to_string()))?; let media_list = pg_media.into_iter().map(|m| m.into()).collect(); Ok((media_list, total_items_result)) } async fn list_by_person_id( &self, person_id: Uuid, options: &ListMediaOptions, ) -> CoreResult<(Vec, i64)> { let count_base_sql = " SELECT COUNT(DISTINCT media.id) as total FROM media JOIN face_regions fr ON media.id = fr.media_id "; let mut count_query = sqlx::QueryBuilder::new(count_base_sql); count_query.push(" WHERE fr.person_id = "); count_query.push_bind(person_id); let (mut count_query, _metadata_filter_count) = self .query_builder .apply_filters_to_query(count_query, options)?; let total_items_result = count_query .build_query_scalar() .fetch_one(&self.pool) .await .map_err(|e| CoreError::Database(e.to_string()))?; let data_base_sql = " SELECT media.id, media.owner_id, media.storage_path, media.original_filename, media.mime_type, media.hash, media.created_at, media.thumbnail_path, media.date_taken FROM media JOIN face_regions fr ON media.id = fr.media_id "; let mut data_query = sqlx::QueryBuilder::new(data_base_sql); data_query.push(" WHERE fr.person_id = "); data_query.push_bind(person_id); let (mut data_query, _metadata_filter_count) = self .query_builder .apply_filters_to_query(data_query, options)?; data_query.push(" GROUP BY media.id "); let data_query = self .query_builder .apply_sorting_to_query(data_query, options)?; let mut data_query = self .query_builder .apply_pagination_to_query(data_query, options)?; let pg_media = data_query .build_query_as::() .fetch_all(&self.pool) .await .map_err(|e| CoreError::Database(e.to_string()))?; let media_list = pg_media.into_iter().map(|m| m.into()).collect(); Ok((media_list, total_items_result)) } async fn update_thumbnail_path(&self, id: Uuid, thumbnail_path: String) -> CoreResult<()> { sqlx::query!( r#" UPDATE media SET thumbnail_path = $2 WHERE id = $1 "#, id, thumbnail_path ) .execute(&self.pool) .await .map_err(|e| CoreError::Database(e.to_string()))?; Ok(()) } async fn delete(&self, id: Uuid) -> CoreResult<()> { sqlx::query!( r#" DELETE FROM media WHERE id = $1 "#, id ) .execute(&self.pool) .await .map_err(|e| CoreError::Database(e.to_string()))?; Ok(()) } }