inbox rate limiting + poster storage cleanup
This commit is contained in:
@@ -2,7 +2,7 @@ use chrono::NaiveDateTime;
|
|||||||
use domain::{
|
use domain::{
|
||||||
errors::DomainError,
|
errors::DomainError,
|
||||||
events::DomainEvent,
|
events::DomainEvent,
|
||||||
value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId},
|
value_objects::{ExternalMetadataId, MovieId, PosterPath, Rating, ReviewId, UserId},
|
||||||
};
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
@@ -28,6 +28,10 @@ pub enum EventPayload {
|
|||||||
movie_id: String,
|
movie_id: String,
|
||||||
external_metadata_id: String,
|
external_metadata_id: String,
|
||||||
},
|
},
|
||||||
|
MovieDeleted {
|
||||||
|
movie_id: String,
|
||||||
|
poster_path: Option<String>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EventPayload {
|
impl EventPayload {
|
||||||
@@ -36,6 +40,7 @@ impl EventPayload {
|
|||||||
EventPayload::ReviewLogged { .. } => "ReviewLogged",
|
EventPayload::ReviewLogged { .. } => "ReviewLogged",
|
||||||
EventPayload::ReviewUpdated { .. } => "ReviewUpdated",
|
EventPayload::ReviewUpdated { .. } => "ReviewUpdated",
|
||||||
EventPayload::MovieDiscovered { .. } => "MovieDiscovered",
|
EventPayload::MovieDiscovered { .. } => "MovieDiscovered",
|
||||||
|
EventPayload::MovieDeleted { .. } => "MovieDeleted",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -78,6 +83,10 @@ impl From<&DomainEvent> for EventPayload {
|
|||||||
external_metadata_id: external_metadata_id.value().to_owned(),
|
external_metadata_id: external_metadata_id.value().to_owned(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
DomainEvent::MovieDeleted { movie_id, poster_path } => EventPayload::MovieDeleted {
|
||||||
|
movie_id: movie_id.value().to_string(),
|
||||||
|
poster_path: poster_path.as_ref().map(|p| p.value().to_string()),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -110,6 +119,14 @@ impl TryFrom<EventPayload> for DomainEvent {
|
|||||||
external_metadata_id: ExternalMetadataId::new(external_metadata_id)?,
|
external_metadata_id: ExternalMetadataId::new(external_metadata_id)?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
EventPayload::MovieDeleted { movie_id, poster_path } => {
|
||||||
|
let movie_id = MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?);
|
||||||
|
let poster_path = poster_path
|
||||||
|
.map(|p| PosterPath::new(p))
|
||||||
|
.transpose()
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||||
|
Ok(DomainEvent::MovieDeleted { movie_id, poster_path })
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String {
|
|||||||
DomainEvent::ReviewLogged { .. } => "review.logged",
|
DomainEvent::ReviewLogged { .. } => "review.logged",
|
||||||
DomainEvent::ReviewUpdated { .. } => "review.updated",
|
DomainEvent::ReviewUpdated { .. } => "review.updated",
|
||||||
DomainEvent::MovieDiscovered { .. } => "movie.discovered",
|
DomainEvent::MovieDiscovered { .. } => "movie.discovered",
|
||||||
|
DomainEvent::MovieDeleted { .. } => "movie.deleted",
|
||||||
};
|
};
|
||||||
format!("{prefix}.{suffix}")
|
format!("{prefix}.{suffix}")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,8 @@ pub use config::StorageConfig;
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use domain::{
|
use domain::{
|
||||||
errors::DomainError,
|
errors::DomainError,
|
||||||
ports::PosterStorage,
|
events::DomainEvent,
|
||||||
|
ports::{EventHandler, PosterStorage},
|
||||||
value_objects::{MovieId, PosterPath},
|
value_objects::{MovieId, PosterPath},
|
||||||
};
|
};
|
||||||
use object_store::{Attribute, Attributes, ObjectStore, PutOptions, path::Path};
|
use object_store::{Attribute, Attributes, ObjectStore, PutOptions, path::Path};
|
||||||
@@ -52,6 +53,15 @@ impl PosterStorage for PosterStorageAdapter {
|
|||||||
PosterPath::new(path.to_string())
|
PosterPath::new(path.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn delete_poster(&self, path: &PosterPath) -> Result<(), DomainError> {
|
||||||
|
let p = Path::from(path.value().to_string());
|
||||||
|
match self.store.delete(&p).await {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(object_store::Error::NotFound { .. }) => Ok(()),
|
||||||
|
Err(e) => Err(DomainError::InfrastructureError(e.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_poster(&self, poster_path: &PosterPath) -> Result<Vec<u8>, DomainError> {
|
async fn get_poster(&self, poster_path: &PosterPath) -> Result<Vec<u8>, DomainError> {
|
||||||
let path = Path::from(poster_path.value().to_string());
|
let path = Path::from(poster_path.value().to_string());
|
||||||
let result = self.store.get(&path).await.map_err(|e| match e {
|
let result = self.store.get(&path).await.map_err(|e| match e {
|
||||||
@@ -68,6 +78,31 @@ impl PosterStorage for PosterStorageAdapter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct PosterCleanupHandler {
|
||||||
|
poster_storage: Arc<dyn PosterStorage>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PosterCleanupHandler {
|
||||||
|
pub fn new(poster_storage: Arc<dyn PosterStorage>) -> Self {
|
||||||
|
Self { poster_storage }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl EventHandler for PosterCleanupHandler {
|
||||||
|
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||||
|
let poster_path = match event {
|
||||||
|
DomainEvent::MovieDeleted { poster_path, .. } => poster_path,
|
||||||
|
_ => return Ok(()),
|
||||||
|
};
|
||||||
|
let Some(path) = poster_path else { return Ok(()) };
|
||||||
|
if let Err(e) = self.poster_storage.delete_poster(path).await {
|
||||||
|
tracing::warn!("poster cleanup failed for {}: {e}", path.value());
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn create() -> anyhow::Result<std::sync::Arc<dyn domain::ports::PosterStorage>> {
|
pub fn create() -> anyhow::Result<std::sync::Arc<dyn domain::ports::PosterStorage>> {
|
||||||
Ok(std::sync::Arc::new(PosterStorageAdapter::from_config(StorageConfig::from_env()?)))
|
Ok(std::sync::Arc::new(PosterStorageAdapter::from_config(StorageConfig::from_env()?)))
|
||||||
}
|
}
|
||||||
@@ -101,4 +136,69 @@ mod tests {
|
|||||||
let result = adapter.get_poster(&path).await;
|
let result = adapter.get_poster(&path).await;
|
||||||
assert!(matches!(result, Err(DomainError::NotFound(_))));
|
assert!(matches!(result, Err(DomainError::NotFound(_))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn delete_poster_removes_file() {
|
||||||
|
let adapter = adapter();
|
||||||
|
let movie_id = MovieId::from_uuid(Uuid::new_v4());
|
||||||
|
let path = adapter.store_poster(&movie_id, b"img").await.unwrap();
|
||||||
|
|
||||||
|
adapter.delete_poster(&path).await.unwrap();
|
||||||
|
|
||||||
|
let result = adapter.get_poster(&path).await;
|
||||||
|
assert!(matches!(result, Err(DomainError::NotFound(_))));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn delete_poster_missing_file_returns_ok() {
|
||||||
|
let adapter = adapter();
|
||||||
|
let path = PosterPath::new("does-not-exist".into()).unwrap();
|
||||||
|
assert!(adapter.delete_poster(&path).await.is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn cleanup_handler_deletes_poster_on_movie_deleted() {
|
||||||
|
use domain::{events::DomainEvent, ports::EventHandler};
|
||||||
|
|
||||||
|
let inner = Arc::new(adapter());
|
||||||
|
let path = inner
|
||||||
|
.store_poster(&MovieId::from_uuid(Uuid::new_v4()), b"img")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let movie_id = MovieId::from_uuid(Uuid::new_v4());
|
||||||
|
|
||||||
|
let handler = PosterCleanupHandler::new(Arc::clone(&inner) as Arc<dyn PosterStorage>);
|
||||||
|
handler
|
||||||
|
.handle(&DomainEvent::MovieDeleted { movie_id, poster_path: Some(path.clone()) })
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(matches!(inner.get_poster(&path).await, Err(DomainError::NotFound(_))));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn cleanup_handler_ignores_none_poster_path() {
|
||||||
|
use domain::{events::DomainEvent, ports::EventHandler};
|
||||||
|
|
||||||
|
let inner = Arc::new(adapter());
|
||||||
|
let handler = PosterCleanupHandler::new(Arc::clone(&inner) as Arc<dyn PosterStorage>);
|
||||||
|
let event = DomainEvent::MovieDeleted {
|
||||||
|
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||||
|
poster_path: None,
|
||||||
|
};
|
||||||
|
handler.handle(&event).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn cleanup_handler_ignores_other_events() {
|
||||||
|
use domain::{events::DomainEvent, ports::EventHandler, value_objects::ExternalMetadataId};
|
||||||
|
|
||||||
|
let inner = Arc::new(adapter());
|
||||||
|
let handler = PosterCleanupHandler::new(Arc::clone(&inner) as Arc<dyn PosterStorage>);
|
||||||
|
let event = DomainEvent::MovieDiscovered {
|
||||||
|
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||||
|
external_metadata_id: ExternalMetadataId::new("tt1234567".to_string()).unwrap(),
|
||||||
|
};
|
||||||
|
handler.handle(&event).await.unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use crate::{commands::DeleteReviewCommand, context::AppContext};
|
use crate::{commands::DeleteReviewCommand, context::AppContext};
|
||||||
use domain::{
|
use domain::{
|
||||||
errors::DomainError,
|
errors::DomainError,
|
||||||
|
events::DomainEvent,
|
||||||
value_objects::{ReviewId, UserId},
|
value_objects::{ReviewId, UserId},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -23,7 +24,15 @@ pub async fn execute(ctx: &AppContext, cmd: DeleteReviewCommand) -> Result<(), D
|
|||||||
|
|
||||||
let history = ctx.diary_repository.get_review_history(&movie_id).await?;
|
let history = ctx.diary_repository.get_review_history(&movie_id).await?;
|
||||||
if history.viewings().is_empty() {
|
if history.viewings().is_empty() {
|
||||||
|
let poster_path = history.movie().poster_path().cloned();
|
||||||
ctx.movie_repository.delete_movie(&movie_id).await?;
|
ctx.movie_repository.delete_movie(&movie_id).await?;
|
||||||
|
// best-effort: movie is already deleted, so publish failure is non-fatal
|
||||||
|
if let Err(e) = ctx.event_publisher
|
||||||
|
.publish(&DomainEvent::MovieDeleted { movie_id, poster_path })
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!("failed to publish MovieDeleted event: {e}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -93,6 +93,7 @@ mod tests {
|
|||||||
DomainEvent::MovieDiscovered { .. } => "movie_discovered",
|
DomainEvent::MovieDiscovered { .. } => "movie_discovered",
|
||||||
DomainEvent::ReviewLogged { .. } => "review_logged",
|
DomainEvent::ReviewLogged { .. } => "review_logged",
|
||||||
DomainEvent::ReviewUpdated { .. } => "review_updated",
|
DomainEvent::ReviewUpdated { .. } => "review_updated",
|
||||||
|
DomainEvent::MovieDeleted { .. } => "movie_deleted",
|
||||||
};
|
};
|
||||||
self.calls.lock().unwrap().push(label);
|
self.calls.lock().unwrap().push(label);
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use chrono::NaiveDateTime;
|
|||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
errors::DomainError,
|
errors::DomainError,
|
||||||
value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId},
|
value_objects::{ExternalMetadataId, MovieId, PosterPath, Rating, ReviewId, UserId},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
@@ -26,6 +26,10 @@ pub enum DomainEvent {
|
|||||||
movie_id: MovieId,
|
movie_id: MovieId,
|
||||||
external_metadata_id: ExternalMetadataId,
|
external_metadata_id: ExternalMetadataId,
|
||||||
},
|
},
|
||||||
|
MovieDeleted {
|
||||||
|
movie_id: MovieId,
|
||||||
|
poster_path: Option<PosterPath>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
@@ -158,6 +158,8 @@ pub trait PosterStorage: Send + Sync {
|
|||||||
) -> Result<PosterPath, DomainError>;
|
) -> Result<PosterPath, DomainError>;
|
||||||
|
|
||||||
async fn get_poster(&self, poster_path: &PosterPath) -> Result<Vec<u8>, DomainError>;
|
async fn get_poster(&self, poster_path: &PosterPath) -> Result<Vec<u8>, DomainError>;
|
||||||
|
|
||||||
|
async fn delete_poster(&self, path: &PosterPath) -> Result<(), DomainError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct GeneratedToken {
|
pub struct GeneratedToken {
|
||||||
|
|||||||
@@ -286,6 +286,9 @@ mod tests {
|
|||||||
async fn get_poster(&self, _: &PosterPath) -> Result<Vec<u8>, DomainError> {
|
async fn get_poster(&self, _: &PosterPath) -> Result<Vec<u8>, DomainError> {
|
||||||
panic!()
|
panic!()
|
||||||
}
|
}
|
||||||
|
async fn delete_poster(&self, _: &PosterPath) -> Result<(), DomainError> {
|
||||||
|
panic!()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl AuthService for Panic {
|
impl AuthService for Panic {
|
||||||
|
|||||||
@@ -8,6 +8,15 @@ use crate::{handlers, state::AppState};
|
|||||||
|
|
||||||
pub fn build_router(state: AppState, ap_router: Router) -> Router {
|
pub fn build_router(state: AppState, ap_router: Router) -> Router {
|
||||||
let rate_limit = state.app_ctx.config.rate_limit;
|
let rate_limit = state.app_ctx.config.rate_limit;
|
||||||
|
|
||||||
|
let ap_cfg = GovernorConfigBuilder::default()
|
||||||
|
.with_extractor(PeerIp::default())
|
||||||
|
.expect_connect_info()
|
||||||
|
.quota_default(per_minute(rate_limit))
|
||||||
|
.finish()
|
||||||
|
.unwrap();
|
||||||
|
let ap_router = ap_router.layer(GovernorLayer::new(ap_cfg));
|
||||||
|
|
||||||
Router::new()
|
Router::new()
|
||||||
.merge(html_routes(rate_limit))
|
.merge(html_routes(rate_limit))
|
||||||
.merge(api_routes(rate_limit))
|
.merge(api_routes(rate_limit))
|
||||||
|
|||||||
@@ -66,6 +66,9 @@ impl PosterStorage for PanicStorage {
|
|||||||
async fn get_poster(&self, _: &PosterPath) -> Result<Vec<u8>, DomainError> {
|
async fn get_poster(&self, _: &PosterPath) -> Result<Vec<u8>, DomainError> {
|
||||||
panic!()
|
panic!()
|
||||||
}
|
}
|
||||||
|
async fn delete_poster(&self, _: &PosterPath) -> Result<(), DomainError> {
|
||||||
|
panic!()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct PanicHasher;
|
struct PanicHasher;
|
||||||
|
|||||||
@@ -116,8 +116,12 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
3,
|
3,
|
||||||
)) as Arc<dyn EventHandler>;
|
)) as Arc<dyn EventHandler>;
|
||||||
|
|
||||||
|
let cleanup = Arc::new(poster_storage::PosterCleanupHandler::new(
|
||||||
|
Arc::clone(&ctx.poster_storage),
|
||||||
|
)) as Arc<dyn EventHandler>;
|
||||||
|
|
||||||
#[cfg(not(feature = "federation"))]
|
#[cfg(not(feature = "federation"))]
|
||||||
{ vec![poster] }
|
{ vec![poster, cleanup] }
|
||||||
|
|
||||||
#[cfg(feature = "federation")]
|
#[cfg(feature = "federation")]
|
||||||
{
|
{
|
||||||
@@ -139,7 +143,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
).await?.event_handler;
|
).await?.event_handler;
|
||||||
|
|
||||||
tracing::info!("federation event handler registered");
|
tracing::info!("federation event handler registered");
|
||||||
vec![poster, ap]
|
vec![poster, cleanup, ap]
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user