use std::{ path::{Path, PathBuf}, sync::Arc, }; use async_trait::async_trait; use futures::stream::StreamExt; use libertas_core::{ authz, config::AppConfig, error::{CoreError, CoreResult}, media_utils::{extract_exif_data_from_bytes, get_storage_path_and_date}, models::{Media, MediaBundle}, repositories::{MediaMetadataRepository, MediaRepository, UserRepository}, schema::{ListMediaOptions, PaginatedResponse, UploadMediaData}, services::{AuthorizationService, MediaService}, }; use serde_json::json; use sha2::{Digest, Sha256}; use tokio::{fs, io::AsyncWriteExt}; use uuid::Uuid; pub struct MediaServiceImpl { repo: Arc, user_repo: Arc, metadata_repo: Arc, auth_service: Arc, config: AppConfig, nats_client: async_nats::Client, } impl MediaServiceImpl { pub fn new( repo: Arc, user_repo: Arc, metadata_repo: Arc, auth_service: Arc, config: AppConfig, nats_client: async_nats::Client, ) -> Self { Self { repo, user_repo, metadata_repo, auth_service, config, nats_client, } } } #[async_trait] impl MediaService for MediaServiceImpl { async fn upload_media(&self, mut data: UploadMediaData<'_>) -> CoreResult { let (file_bytes, hash, file_size) = self.hash_and_buffer_stream(&mut data).await?; let owner_id = data.owner_id; let filename = data.filename; let mime_type = data.mime_type; self.check_upload_prerequisites(owner_id, file_size, &hash) .await?; let file_bytes_clone = file_bytes.clone(); let extracted_data = tokio::task::spawn_blocking(move || extract_exif_data_from_bytes(&file_bytes_clone)) .await .unwrap()?; let (storage_path_buf, _date_taken) = get_storage_path_and_date(&extracted_data, &filename); let storage_path_str = self .persist_media_file(&file_bytes, &storage_path_buf) .await?; let media = self .persist_media_metadata( owner_id, filename, mime_type, storage_path_str, hash, file_size, ) .await?; self.publish_new_media_job(media.id).await?; Ok(media) } async fn get_media_details(&self, id: Uuid, user_id: Option) -> CoreResult { self.auth_service .check_permission(user_id, authz::Permission::ViewMedia(id)) .await?; let media = self .repo .find_by_id(id) .await? .ok_or(CoreError::NotFound("Media".to_string(), id))?; let metadata = self.metadata_repo.find_by_media_id(id).await?; Ok(MediaBundle { media, metadata }) } async fn list_user_media( &self, user_id: Uuid, options: ListMediaOptions, ) -> CoreResult> { let (data, total_items) = self.repo.list_by_user(user_id, &options).await?; let pagination = options.pagination.unwrap(); let response = PaginatedResponse::new(data, pagination.page, pagination.limit, total_items); Ok(response) } async fn get_media_filepath(&self, id: Uuid, user_id: Option) -> CoreResult { self.auth_service .check_permission(user_id, authz::Permission::ViewMedia(id)) .await?; let media = self .repo .find_by_id(id) .await? .ok_or(CoreError::NotFound("Media".to_string(), id))?; Ok(media.storage_path) } async fn get_media_thumbnail_path( &self, id: Uuid, user_id: Option, ) -> CoreResult { self.auth_service .check_permission(user_id, authz::Permission::ViewMedia(id)) .await?; let media = self .repo .find_by_id(id) .await? .ok_or(CoreError::NotFound("Media".to_string(), id))?; media .thumbnail_path .ok_or(CoreError::NotFound("Thumbnail for Media".to_string(), id)) } async fn get_media_for_serving(&self, id: Uuid, user_id: Option) -> CoreResult { self.auth_service .check_permission(user_id, authz::Permission::ViewMedia(id)) .await?; let media = self .repo .find_by_id(id) .await? .ok_or(CoreError::NotFound("Media".to_string(), id))?; Ok(media) } async fn delete_media(&self, id: Uuid, user_id: Uuid) -> CoreResult<()> { self.auth_service .check_permission(Some(user_id), authz::Permission::DeleteMedia(id)) .await?; let media = self .repo .find_by_id(id) .await? .ok_or(CoreError::NotFound("Media".to_string(), id))?; let user = self .user_repo .find_by_id(user_id) .await? .ok_or(CoreError::NotFound("User".to_string(), user_id))?; let full_path = PathBuf::from(&self.config.media_library_path).join(&media.storage_path); self.repo.delete(id).await?; let file_size = match fs::metadata(&full_path).await { Ok(metadata) => metadata.len() as i64, Err(_) => 0, }; if let Err(e) = fs::remove_file(full_path).await { tracing::error!("Failed to delete media file from disk: {}", e); } self.user_repo .update_storage_used(user.id, -file_size) .await?; let job_payload = json!({ "storage_path": media.storage_path, "thumbnail_path": media.thumbnail_path }); self.nats_client .publish("media.deleted".to_string(), job_payload.to_string().into()) .await .map_err(|e| CoreError::Unknown(format!("Failed to publish NATS job: {}", e)))?; Ok(()) } } impl MediaServiceImpl { async fn hash_and_buffer_stream( &self, stream: &mut UploadMediaData<'_>, ) -> CoreResult<(Vec, String, i64)> { let mut hasher = Sha256::new(); let mut file_bytes = Vec::new(); while let Some(chunk_result) = stream.stream.next().await { let chunk = chunk_result.map_err(|e| CoreError::Io(e))?; hasher.update(&chunk); file_bytes.extend_from_slice(&chunk); } let file_size = file_bytes.len() as i64; let hash = format!("{:x}", hasher.finalize()); Ok((file_bytes, hash, file_size)) } async fn check_upload_prerequisites( &self, user_id: Uuid, file_size: i64, hash: &str, ) -> CoreResult<()> { let user = self .user_repo .find_by_id(user_id) .await? .ok_or(CoreError::NotFound("User".to_string(), user_id))?; if user.storage_used + file_size > user.storage_quota { return Err(CoreError::Auth(format!( "Storage quota exceeded. Used: {}, Quota: {}", user.storage_used, user.storage_quota ))); } if self.repo.find_by_hash(hash).await?.is_some() { return Err(CoreError::Duplicate( "A file with this content already exists".to_string(), )); } Ok(()) } async fn persist_media_file( &self, file_bytes: &[u8], storage_path: &Path, ) -> CoreResult { let mut dest_path = PathBuf::from(&self.config.media_library_path); dest_path.push(storage_path); if let Some(parent) = dest_path.parent() { fs::create_dir_all(parent).await?; } let mut file = fs::File::create(&dest_path).await?; file.write_all(&file_bytes).await?; Ok(storage_path.to_string_lossy().to_string()) } async fn persist_media_metadata( &self, owner_id: Uuid, filename: String, mime_type: String, storage_path: String, hash: String, file_size: i64, ) -> CoreResult { let media_model = Media { id: Uuid::new_v4(), owner_id, storage_path, original_filename: filename, mime_type, hash, created_at: chrono::Utc::now(), thumbnail_path: None, }; self.repo.create(&media_model).await?; self.user_repo .update_storage_used(owner_id, file_size) .await?; Ok(media_model) } async fn publish_new_media_job(&self, media_id: Uuid) -> CoreResult<()> { let job_payload = json!({ "media_id": media_id }); self.nats_client .publish("media.new".to_string(), job_payload.to_string().into()) .await .map_err(|e| CoreError::Unknown(format!("Failed to publish NATS job: {}", e))) } }