use std::{path::PathBuf, sync::Arc}; use async_trait::async_trait; use chrono::Datelike; use futures::stream::StreamExt; use libertas_core::{ authz, config::Config, error::{CoreError, CoreResult}, models::Media, repositories::{AlbumShareRepository, MediaRepository, UserRepository}, schema::UploadMediaData, services::MediaService, }; use serde_json::json; use sha2::{Digest, Sha256}; use tokio::{fs, io::AsyncWriteExt}; use uuid::Uuid; pub struct MediaServiceImpl { repo: Arc, user_repo: Arc, album_share_repo: Arc, config: Config, nats_client: async_nats::Client, } impl MediaServiceImpl { pub fn new( repo: Arc, user_repo: Arc, album_share_repo: Arc, config: Config, nats_client: async_nats::Client, ) -> Self { Self { repo, user_repo, album_share_repo, config, nats_client, } } } #[async_trait] impl MediaService for MediaServiceImpl { async fn upload_media(&self, mut data: UploadMediaData<'_>) -> CoreResult { let user = self .user_repo .find_by_id(data.owner_id) .await? .ok_or(CoreError::NotFound("User".to_string(), data.owner_id))?; let mut hasher = Sha256::new(); let mut file_bytes = Vec::new(); while let Some(chunk_result) = data.stream.next().await { let chunk = chunk_result.map_err(|e| CoreError::Io(e))?; hasher.update(&chunk); file_bytes.extend_from_slice(&chunk); } let file_size = file_bytes.len() as i64; if user.storage_used + file_size > user.storage_quota { return Err(CoreError::Auth(format!( "Storage quota exceeded. Used: {}, Quota: {}", user.storage_used, user.storage_quota ))); } let hash = format!("{:x}", hasher.finalize()); if self.repo.find_by_hash(&hash).await?.is_some() { return Err(CoreError::Duplicate( "A file with this content already exists".to_string(), )); } let now = chrono::Utc::now(); let year = now.year().to_string(); let month = format!("{:02}", now.month()); let mut dest_path = PathBuf::from(&self.config.media_library_path); dest_path.push(year.clone()); dest_path.push(month.clone()); fs::create_dir_all(&dest_path) .await .map_err(|e| CoreError::Io(e))?; dest_path.push(&data.filename); let storage_path_str = PathBuf::from(&year) .join(&month) .join(&data.filename) .to_string_lossy() .to_string(); let mut file = fs::File::create(&dest_path) .await .map_err(|e| CoreError::Io(e))?; file.write_all(&file_bytes) .await .map_err(|e| CoreError::Io(e))?; let media_model = Media { id: Uuid::new_v4(), owner_id: data.owner_id, storage_path: storage_path_str, original_filename: data.filename, mime_type: data.mime_type, hash, created_at: now, extracted_location: None, width: None, height: None, }; self.repo.create(&media_model).await?; self.user_repo .update_storage_used(user.id, file_size) .await?; let job_payload = json!({ "media_id": media_model.id }); self.nats_client .publish("media.new".to_string(), job_payload.to_string().into()) .await .map_err(|e| CoreError::Unknown(format!("Failed to publish NATS job: {}", e)))?; Ok(media_model) } async fn get_media_details(&self, id: Uuid, user_id: Uuid) -> CoreResult { let media = self .repo .find_by_id(id) .await? .ok_or(CoreError::NotFound("Media".to_string(), id))?; let user = self .user_repo .find_by_id(user_id) .await? .ok_or(CoreError::NotFound("User".to_string(), user_id))?; if authz::is_owner(user_id, &media) || authz::is_admin(&user) { return Ok(media); } let is_shared = self .album_share_repo .is_media_in_shared_album(id, user_id) .await?; if is_shared { return Ok(media); } Err(CoreError::Auth("Access denied".to_string())) } async fn list_user_media(&self, user_id: Uuid) -> CoreResult> { self.repo.list_by_user(user_id).await } async fn get_media_filepath(&self, id: Uuid, user_id: Uuid) -> CoreResult { let media = self .repo .find_by_id(id) .await? .ok_or(CoreError::NotFound("Media".to_string(), id))?; let user = self .user_repo .find_by_id(user_id) .await? .ok_or(CoreError::NotFound("User".to_string(), user_id))?; if authz::is_owner(user_id, &media) || authz::is_admin(&user) { return Ok(media.storage_path); } let is_shared = self .album_share_repo .is_media_in_shared_album(id, user_id) .await?; if is_shared { return Ok(media.storage_path); } Err(CoreError::Auth("Access denied".to_string())) } }