use std::{path::PathBuf, sync::Arc}; use async_trait::async_trait; use chrono::Datelike; use futures::stream::StreamExt; use libertas_core::{ authz, config::Config, error::{CoreError, CoreResult}, models::Media, repositories::{AlbumShareRepository, MediaRepository, UserRepository}, schema::{ListMediaOptions, UploadMediaData}, services::MediaService, }; use serde_json::json; use sha2::{Digest, Sha256}; use tokio::{fs, io::AsyncWriteExt}; use uuid::Uuid; pub struct MediaServiceImpl { repo: Arc, user_repo: Arc, album_share_repo: Arc, config: Config, nats_client: async_nats::Client, } impl MediaServiceImpl { pub fn new( repo: Arc, user_repo: Arc, album_share_repo: Arc, config: Config, nats_client: async_nats::Client, ) -> Self { Self { repo, user_repo, album_share_repo, config, nats_client, } } } #[async_trait] impl MediaService for MediaServiceImpl { async fn upload_media(&self, mut data: UploadMediaData<'_>) -> CoreResult { let (file_bytes, hash, file_size) = self.hash_and_buffer_stream(&mut data).await?; let owner_id = data.owner_id; let filename = data.filename; let mime_type = data.mime_type; self.check_upload_prerequisites(owner_id, file_size, &hash) .await?; let storage_path = self.persist_media_file(&file_bytes, &filename).await?; let media = self .persist_media_metadata(owner_id, filename, mime_type, storage_path, hash, file_size) .await?; self.publish_new_media_job(media.id).await?; Ok(media) } async fn get_media_details(&self, id: Uuid, user_id: Uuid) -> CoreResult { let media = self .repo .find_by_id(id) .await? .ok_or(CoreError::NotFound("Media".to_string(), id))?; let user = self .user_repo .find_by_id(user_id) .await? .ok_or(CoreError::NotFound("User".to_string(), user_id))?; if authz::is_owner(user_id, &media) || authz::is_admin(&user) { return Ok(media); } let is_shared = self .album_share_repo .is_media_in_shared_album(id, user_id) .await?; if is_shared { return Ok(media); } Err(CoreError::Auth("Access denied".to_string())) } async fn list_user_media(&self, user_id: Uuid, options: ListMediaOptions) -> CoreResult> { self.repo.list_by_user(user_id, &options).await } async fn get_media_filepath(&self, id: Uuid, user_id: Uuid) -> CoreResult { let media = self .repo .find_by_id(id) .await? .ok_or(CoreError::NotFound("Media".to_string(), id))?; let user = self .user_repo .find_by_id(user_id) .await? .ok_or(CoreError::NotFound("User".to_string(), user_id))?; if authz::is_owner(user_id, &media) || authz::is_admin(&user) { return Ok(media.storage_path); } let is_shared = self .album_share_repo .is_media_in_shared_album(id, user_id) .await?; if is_shared { return Ok(media.storage_path); } Err(CoreError::Auth("Access denied".to_string())) } async fn delete_media(&self, id: Uuid, user_id: Uuid) -> CoreResult<()> { let media = self .repo .find_by_id(id) .await? .ok_or(CoreError::NotFound("Media".to_string(), id))?; let user = self .user_repo .find_by_id(user_id) .await? .ok_or(CoreError::NotFound("User".to_string(), user_id))?; if !authz::is_owner(user_id, &media) && !authz::is_admin(&user) { return Err(CoreError::Auth("Access denied".to_string())); } let full_path = PathBuf::from(&self.config.media_library_path).join(&media.storage_path); self.repo.delete(id).await?; let file_size = match fs::metadata(&full_path).await { Ok(metadata) => metadata.len() as i64, Err(_) => 0, }; if let Err(e) = fs::remove_file(full_path).await { tracing::error!("Failed to delete media file from disk: {}", e); } self.user_repo .update_storage_used(user.id, -file_size) .await?; let job_payload = json!({ "storage_path": media.storage_path }); self.nats_client .publish("media.deleted".to_string(), job_payload.to_string().into()) .await .map_err(|e| CoreError::Unknown(format!("Failed to publish NATS job: {}", e)))?; Ok(()) } } impl MediaServiceImpl { async fn hash_and_buffer_stream( &self, stream: &mut UploadMediaData<'_>, ) -> CoreResult<(Vec, String, i64)> { let mut hasher = Sha256::new(); let mut file_bytes = Vec::new(); while let Some(chunk_result) = stream.stream.next().await { let chunk = chunk_result.map_err(|e| CoreError::Io(e))?; hasher.update(&chunk); file_bytes.extend_from_slice(&chunk); } let file_size = file_bytes.len() as i64; let hash = format!("{:x}", hasher.finalize()); Ok((file_bytes, hash, file_size)) } async fn check_upload_prerequisites( &self, user_id: Uuid, file_size: i64, hash: &str, ) -> CoreResult<()> { let user = self .user_repo .find_by_id(user_id) .await? .ok_or(CoreError::NotFound("User".to_string(), user_id))?; if user.storage_used + file_size > user.storage_quota { return Err(CoreError::Auth(format!( "Storage quota exceeded. Used: {}, Quota: {}", user.storage_used, user.storage_quota ))); } if self.repo.find_by_hash(hash).await?.is_some() { return Err(CoreError::Duplicate( "A file with this content already exists".to_string(), )); } Ok(()) } async fn persist_media_file(&self, file_bytes: &[u8], filename: &str) -> CoreResult { let now = chrono::Utc::now(); let year = now.year().to_string(); let month = format!("{:02}", now.month()); let mut dest_path = PathBuf::from(&self.config.media_library_path); dest_path.push(year.clone()); dest_path.push(month.clone()); fs::create_dir_all(&dest_path).await?; dest_path.push(filename); let storage_path_str = PathBuf::from(&year) .join(&month) .join(filename) .to_string_lossy() .to_string(); let mut file = fs::File::create(&dest_path).await?; file.write_all(&file_bytes).await?; Ok(storage_path_str) } async fn persist_media_metadata( &self, owner_id: Uuid, filename: String, mime_type: String, storage_path: String, hash: String, file_size: i64, ) -> CoreResult { let media_model = Media { id: Uuid::new_v4(), owner_id, storage_path, original_filename: filename, mime_type, hash, created_at: chrono::Utc::now(), thumbnail_path: None, }; self.repo.create(&media_model).await?; self.user_repo .update_storage_used(owner_id, file_size) .await?; Ok(media_model) } async fn publish_new_media_job(&self, media_id: Uuid) -> CoreResult<()> { let job_payload = json!({ "media_id": media_id }); self.nats_client .publish("media.new".to_string(), job_payload.to_string().into()) .await .map_err(|e| CoreError::Unknown(format!("Failed to publish NATS job: {}", e))) } }