diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs index 97455f7..43b0180 100644 --- a/crates/adapters/event-payload/src/lib.rs +++ b/crates/adapters/event-payload/src/lib.rs @@ -2,7 +2,7 @@ use chrono::NaiveDateTime; use domain::{ errors::DomainError, events::DomainEvent, - value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId}, + value_objects::{ExternalMetadataId, MovieId, PosterPath, Rating, ReviewId, UserId}, }; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -28,6 +28,10 @@ pub enum EventPayload { movie_id: String, external_metadata_id: String, }, + MovieDeleted { + movie_id: String, + poster_path: Option, + }, } impl EventPayload { @@ -36,6 +40,7 @@ impl EventPayload { EventPayload::ReviewLogged { .. } => "ReviewLogged", EventPayload::ReviewUpdated { .. } => "ReviewUpdated", EventPayload::MovieDiscovered { .. } => "MovieDiscovered", + EventPayload::MovieDeleted { .. } => "MovieDeleted", } } } @@ -78,6 +83,10 @@ impl From<&DomainEvent> for EventPayload { external_metadata_id: external_metadata_id.value().to_owned(), } } + DomainEvent::MovieDeleted { movie_id, poster_path } => EventPayload::MovieDeleted { + movie_id: movie_id.value().to_string(), + poster_path: poster_path.as_ref().map(|p| p.value().to_string()), + }, } } } @@ -110,6 +119,14 @@ impl TryFrom for DomainEvent { external_metadata_id: ExternalMetadataId::new(external_metadata_id)?, }) } + EventPayload::MovieDeleted { movie_id, poster_path } => { + let movie_id = MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?); + let poster_path = poster_path + .map(|p| PosterPath::new(p)) + .transpose() + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + Ok(DomainEvent::MovieDeleted { movie_id, poster_path }) + } } } } diff --git a/crates/adapters/nats/src/subject.rs b/crates/adapters/nats/src/subject.rs index 2deafca..7887b8a 100644 --- a/crates/adapters/nats/src/subject.rs +++ b/crates/adapters/nats/src/subject.rs @@ -5,6 +5,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String { DomainEvent::ReviewLogged { .. } => "review.logged", DomainEvent::ReviewUpdated { .. } => "review.updated", DomainEvent::MovieDiscovered { .. } => "movie.discovered", + DomainEvent::MovieDeleted { .. } => "movie.deleted", }; format!("{prefix}.{suffix}") } diff --git a/crates/adapters/poster-storage/src/lib.rs b/crates/adapters/poster-storage/src/lib.rs index 745561f..c04e5f4 100644 --- a/crates/adapters/poster-storage/src/lib.rs +++ b/crates/adapters/poster-storage/src/lib.rs @@ -4,7 +4,8 @@ pub use config::StorageConfig; use async_trait::async_trait; use domain::{ errors::DomainError, - ports::PosterStorage, + events::DomainEvent, + ports::{EventHandler, PosterStorage}, value_objects::{MovieId, PosterPath}, }; use object_store::{Attribute, Attributes, ObjectStore, PutOptions, path::Path}; @@ -52,6 +53,15 @@ impl PosterStorage for PosterStorageAdapter { PosterPath::new(path.to_string()) } + async fn delete_poster(&self, path: &PosterPath) -> Result<(), DomainError> { + let p = Path::from(path.value().to_string()); + match self.store.delete(&p).await { + Ok(()) => Ok(()), + Err(object_store::Error::NotFound { .. }) => Ok(()), + Err(e) => Err(DomainError::InfrastructureError(e.to_string())), + } + } + async fn get_poster(&self, poster_path: &PosterPath) -> Result, DomainError> { let path = Path::from(poster_path.value().to_string()); let result = self.store.get(&path).await.map_err(|e| match e { @@ -68,6 +78,31 @@ impl PosterStorage for PosterStorageAdapter { } } +pub struct PosterCleanupHandler { + poster_storage: Arc, +} + +impl PosterCleanupHandler { + pub fn new(poster_storage: Arc) -> Self { + Self { poster_storage } + } +} + +#[async_trait] +impl EventHandler for PosterCleanupHandler { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + let poster_path = match event { + DomainEvent::MovieDeleted { poster_path, .. } => poster_path, + _ => return Ok(()), + }; + let Some(path) = poster_path else { return Ok(()) }; + if let Err(e) = self.poster_storage.delete_poster(path).await { + tracing::warn!("poster cleanup failed for {}: {e}", path.value()); + } + Ok(()) + } +} + pub fn create() -> anyhow::Result> { Ok(std::sync::Arc::new(PosterStorageAdapter::from_config(StorageConfig::from_env()?))) } @@ -101,4 +136,69 @@ mod tests { let result = adapter.get_poster(&path).await; assert!(matches!(result, Err(DomainError::NotFound(_)))); } + + #[tokio::test] + async fn delete_poster_removes_file() { + let adapter = adapter(); + let movie_id = MovieId::from_uuid(Uuid::new_v4()); + let path = adapter.store_poster(&movie_id, b"img").await.unwrap(); + + adapter.delete_poster(&path).await.unwrap(); + + let result = adapter.get_poster(&path).await; + assert!(matches!(result, Err(DomainError::NotFound(_)))); + } + + #[tokio::test] + async fn delete_poster_missing_file_returns_ok() { + let adapter = adapter(); + let path = PosterPath::new("does-not-exist".into()).unwrap(); + assert!(adapter.delete_poster(&path).await.is_ok()); + } + + #[tokio::test] + async fn cleanup_handler_deletes_poster_on_movie_deleted() { + use domain::{events::DomainEvent, ports::EventHandler}; + + let inner = Arc::new(adapter()); + let path = inner + .store_poster(&MovieId::from_uuid(Uuid::new_v4()), b"img") + .await + .unwrap(); + let movie_id = MovieId::from_uuid(Uuid::new_v4()); + + let handler = PosterCleanupHandler::new(Arc::clone(&inner) as Arc); + handler + .handle(&DomainEvent::MovieDeleted { movie_id, poster_path: Some(path.clone()) }) + .await + .unwrap(); + + assert!(matches!(inner.get_poster(&path).await, Err(DomainError::NotFound(_)))); + } + + #[tokio::test] + async fn cleanup_handler_ignores_none_poster_path() { + use domain::{events::DomainEvent, ports::EventHandler}; + + let inner = Arc::new(adapter()); + let handler = PosterCleanupHandler::new(Arc::clone(&inner) as Arc); + let event = DomainEvent::MovieDeleted { + movie_id: MovieId::from_uuid(Uuid::new_v4()), + poster_path: None, + }; + handler.handle(&event).await.unwrap(); + } + + #[tokio::test] + async fn cleanup_handler_ignores_other_events() { + use domain::{events::DomainEvent, ports::EventHandler, value_objects::ExternalMetadataId}; + + let inner = Arc::new(adapter()); + let handler = PosterCleanupHandler::new(Arc::clone(&inner) as Arc); + let event = DomainEvent::MovieDiscovered { + movie_id: MovieId::from_uuid(Uuid::new_v4()), + external_metadata_id: ExternalMetadataId::new("tt1234567".to_string()).unwrap(), + }; + handler.handle(&event).await.unwrap(); + } } diff --git a/crates/application/src/use_cases/delete_review.rs b/crates/application/src/use_cases/delete_review.rs index e55a194..79039f7 100644 --- a/crates/application/src/use_cases/delete_review.rs +++ b/crates/application/src/use_cases/delete_review.rs @@ -1,6 +1,7 @@ use crate::{commands::DeleteReviewCommand, context::AppContext}; use domain::{ errors::DomainError, + events::DomainEvent, value_objects::{ReviewId, UserId}, }; @@ -23,7 +24,15 @@ pub async fn execute(ctx: &AppContext, cmd: DeleteReviewCommand) -> Result<(), D let history = ctx.diary_repository.get_review_history(&movie_id).await?; if history.viewings().is_empty() { + let poster_path = history.movie().poster_path().cloned(); ctx.movie_repository.delete_movie(&movie_id).await?; + // best-effort: movie is already deleted, so publish failure is non-fatal + if let Err(e) = ctx.event_publisher + .publish(&DomainEvent::MovieDeleted { movie_id, poster_path }) + .await + { + tracing::warn!("failed to publish MovieDeleted event: {e}"); + } } Ok(()) diff --git a/crates/application/src/worker.rs b/crates/application/src/worker.rs index 050fd55..ab13714 100644 --- a/crates/application/src/worker.rs +++ b/crates/application/src/worker.rs @@ -93,6 +93,7 @@ mod tests { DomainEvent::MovieDiscovered { .. } => "movie_discovered", DomainEvent::ReviewLogged { .. } => "review_logged", DomainEvent::ReviewUpdated { .. } => "review_updated", + DomainEvent::MovieDeleted { .. } => "movie_deleted", }; self.calls.lock().unwrap().push(label); Ok(()) diff --git a/crates/domain/src/events.rs b/crates/domain/src/events.rs index 7ff5bf4..0d2df94 100644 --- a/crates/domain/src/events.rs +++ b/crates/domain/src/events.rs @@ -3,7 +3,7 @@ use chrono::NaiveDateTime; use crate::{ errors::DomainError, - value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId}, + value_objects::{ExternalMetadataId, MovieId, PosterPath, Rating, ReviewId, UserId}, }; #[derive(Clone, Debug)] @@ -26,6 +26,10 @@ pub enum DomainEvent { movie_id: MovieId, external_metadata_id: ExternalMetadataId, }, + MovieDeleted { + movie_id: MovieId, + poster_path: Option, + }, } #[async_trait] diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index e4fa41f..b77a7f3 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -158,6 +158,8 @@ pub trait PosterStorage: Send + Sync { ) -> Result; async fn get_poster(&self, poster_path: &PosterPath) -> Result, DomainError>; + + async fn delete_poster(&self, path: &PosterPath) -> Result<(), DomainError>; } pub struct GeneratedToken { diff --git a/crates/presentation/src/extractors.rs b/crates/presentation/src/extractors.rs index 8f3ed46..b69eb01 100644 --- a/crates/presentation/src/extractors.rs +++ b/crates/presentation/src/extractors.rs @@ -286,6 +286,9 @@ mod tests { async fn get_poster(&self, _: &PosterPath) -> Result, DomainError> { panic!() } + async fn delete_poster(&self, _: &PosterPath) -> Result<(), DomainError> { + panic!() + } } #[async_trait::async_trait] impl AuthService for Panic { diff --git a/crates/presentation/src/routes.rs b/crates/presentation/src/routes.rs index 68c317e..6dec425 100644 --- a/crates/presentation/src/routes.rs +++ b/crates/presentation/src/routes.rs @@ -8,6 +8,15 @@ use crate::{handlers, state::AppState}; pub fn build_router(state: AppState, ap_router: Router) -> Router { let rate_limit = state.app_ctx.config.rate_limit; + + let ap_cfg = GovernorConfigBuilder::default() + .with_extractor(PeerIp::default()) + .expect_connect_info() + .quota_default(per_minute(rate_limit)) + .finish() + .unwrap(); + let ap_router = ap_router.layer(GovernorLayer::new(ap_cfg)); + Router::new() .merge(html_routes(rate_limit)) .merge(api_routes(rate_limit)) diff --git a/crates/presentation/tests/api_test.rs b/crates/presentation/tests/api_test.rs index 82509a0..637329a 100644 --- a/crates/presentation/tests/api_test.rs +++ b/crates/presentation/tests/api_test.rs @@ -66,6 +66,9 @@ impl PosterStorage for PanicStorage { async fn get_poster(&self, _: &PosterPath) -> Result, DomainError> { panic!() } + async fn delete_poster(&self, _: &PosterPath) -> Result<(), DomainError> { + panic!() + } } struct PanicHasher; diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 2c2d50c..322ccce 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -116,8 +116,12 @@ async fn main() -> anyhow::Result<()> { 3, )) as Arc; + let cleanup = Arc::new(poster_storage::PosterCleanupHandler::new( + Arc::clone(&ctx.poster_storage), + )) as Arc; + #[cfg(not(feature = "federation"))] - { vec![poster] } + { vec![poster, cleanup] } #[cfg(feature = "federation")] { @@ -139,7 +143,7 @@ async fn main() -> anyhow::Result<()> { ).await?.event_handler; tracing::info!("federation event handler registered"); - vec![poster, ap] + vec![poster, cleanup, ap] } };