diff --git a/crates/application/src/integrations/cleanup.rs b/crates/application/src/integrations/cleanup.rs index 8675d41..62e5d57 100644 --- a/crates/application/src/integrations/cleanup.rs +++ b/crates/application/src/integrations/cleanup.rs @@ -1,14 +1,11 @@ +use std::sync::Arc; + use chrono::Duration; -use domain::errors::DomainError; +use domain::{errors::DomainError, ports::WatchEventRepository}; -use crate::context::AppContext; - -pub async fn execute(ctx: &AppContext) -> Result { +pub async fn execute(watch_event: Arc) -> Result { let cutoff = chrono::Utc::now().naive_utc() - Duration::days(30); - ctx.repos - .watch_event - .delete_non_pending_older_than(cutoff) - .await + watch_event.delete_non_pending_older_than(cutoff).await } #[cfg(test)] diff --git a/crates/application/src/integrations/confirm.rs b/crates/application/src/integrations/confirm.rs index 058a315..8b8616d 100644 --- a/crates/application/src/integrations/confirm.rs +++ b/crates/application/src/integrations/confirm.rs @@ -1,24 +1,29 @@ +use std::sync::Arc; + use domain::{ errors::DomainError, models::WatchEventStatus, + ports::WatchEventRepository, value_objects::{UserId, WatchEventId}, }; use crate::{ - context::AppContext, diary::commands::{LogReviewCommand, MovieInput}, integrations::commands::ConfirmWatchEventsCommand, + ports::ReviewLogger, }; -pub async fn execute(ctx: &AppContext, cmd: ConfirmWatchEventsCommand) -> Result { +pub async fn execute( + watch_event: Arc, + review_logger: Arc, + cmd: ConfirmWatchEventsCommand, +) -> Result { let user_id = UserId::from_uuid(cmd.user_id); let mut confirmed = 0u32; for c in cmd.confirmations { let event_id = WatchEventId::from_uuid(c.watch_event_id); - let event = ctx - .repos - .watch_event + let event = watch_event .get_by_id(&event_id) .await? .ok_or_else(|| DomainError::NotFound(format!("WatchEvent {}", c.watch_event_id)))?; @@ -53,10 +58,9 @@ pub async fn execute(ctx: &AppContext, cmd: ConfirmWatchEventsCommand) -> Result watched_at: *event.watched_at(), }; - ctx.services.review_logger.log_review(review_cmd).await?; + review_logger.log_review(review_cmd).await?; - ctx.repos - .watch_event + watch_event .update_status(&event_id, WatchEventStatus::Confirmed) .await?; diff --git a/crates/application/src/integrations/deps.rs b/crates/application/src/integrations/deps.rs new file mode 100644 index 0000000..8744886 --- /dev/null +++ b/crates/application/src/integrations/deps.rs @@ -0,0 +1,9 @@ +use std::sync::Arc; + +use domain::ports::{EventPublisher, WatchEventRepository, WebhookTokenRepository}; + +pub struct IngestWatchEventDeps { + pub webhook_token: Arc, + pub watch_event: Arc, + pub event_publisher: Arc, +} diff --git a/crates/application/src/integrations/dismiss.rs b/crates/application/src/integrations/dismiss.rs index 54dc88d..c8d690b 100644 --- a/crates/application/src/integrations/dismiss.rs +++ b/crates/application/src/integrations/dismiss.rs @@ -1,12 +1,18 @@ +use std::sync::Arc; + use domain::{ errors::DomainError, models::WatchEventStatus, + ports::WatchEventRepository, value_objects::{UserId, WatchEventId}, }; -use crate::{context::AppContext, integrations::commands::DismissWatchEventsCommand}; +use crate::integrations::commands::DismissWatchEventsCommand; -pub async fn execute(ctx: &AppContext, cmd: DismissWatchEventsCommand) -> Result { +pub async fn execute( + watch_event: Arc, + cmd: DismissWatchEventsCommand, +) -> Result { let user_id = UserId::from_uuid(cmd.user_id); if cmd.event_ids.is_empty() { return Ok(0); @@ -18,7 +24,7 @@ pub async fn execute(ctx: &AppContext, cmd: DismissWatchEventsCommand) -> Result .map(|id| WatchEventId::from_uuid(*id)) .collect(); - let events = ctx.repos.watch_event.get_by_ids(&ids).await?; + let events = watch_event.get_by_ids(&ids).await?; if events.len() != ids.len() { return Err(DomainError::NotFound( @@ -31,9 +37,7 @@ pub async fn execute(ctx: &AppContext, cmd: DismissWatchEventsCommand) -> Result } } - let count = ctx - .repos - .watch_event + let count = watch_event .update_status_batch(&ids, WatchEventStatus::Dismissed) .await?; diff --git a/crates/application/src/integrations/generate_token.rs b/crates/application/src/integrations/generate_token.rs index 62ff1e5..c6102ea 100644 --- a/crates/application/src/integrations/generate_token.rs +++ b/crates/application/src/integrations/generate_token.rs @@ -1,7 +1,9 @@ -use domain::{errors::DomainError, models::WebhookToken, value_objects::UserId}; +use std::sync::Arc; + +use domain::{errors::DomainError, models::WebhookToken, ports::WebhookTokenRepository, value_objects::UserId}; use sha2::{Digest, Sha256}; -use crate::{context::AppContext, integrations::commands::GenerateWebhookTokenCommand}; +use crate::integrations::commands::GenerateWebhookTokenCommand; pub struct GeneratedWebhookToken { pub token_plaintext: String, @@ -9,7 +11,7 @@ pub struct GeneratedWebhookToken { } pub async fn execute( - ctx: &AppContext, + webhook_token: Arc, cmd: GenerateWebhookTokenCommand, ) -> Result { let plaintext = generate_random_token(); @@ -18,7 +20,7 @@ pub async fn execute( let user_id = UserId::from_uuid(cmd.user_id); let token = WebhookToken::new(user_id, hash, cmd.provider, cmd.label); - ctx.repos.webhook_token.save(&token).await?; + webhook_token.save(&token).await?; Ok(GeneratedWebhookToken { token_plaintext: plaintext, diff --git a/crates/application/src/integrations/get_queue.rs b/crates/application/src/integrations/get_queue.rs index 3a5e164..ef2d943 100644 --- a/crates/application/src/integrations/get_queue.rs +++ b/crates/application/src/integrations/get_queue.rs @@ -1,13 +1,15 @@ -use domain::{errors::DomainError, models::WatchEvent, value_objects::UserId}; +use std::sync::Arc; -use crate::{context::AppContext, integrations::queries::GetWatchQueueQuery}; +use domain::{errors::DomainError, models::WatchEvent, ports::WatchEventRepository, value_objects::UserId}; + +use crate::integrations::queries::GetWatchQueueQuery; pub async fn execute( - ctx: &AppContext, + watch_event: Arc, query: GetWatchQueueQuery, ) -> Result, DomainError> { let user_id = UserId::from_uuid(query.user_id); - ctx.repos.watch_event.list_pending(&user_id).await + watch_event.list_pending(&user_id).await } #[cfg(test)] diff --git a/crates/application/src/integrations/get_tokens.rs b/crates/application/src/integrations/get_tokens.rs index d912255..2bac36c 100644 --- a/crates/application/src/integrations/get_tokens.rs +++ b/crates/application/src/integrations/get_tokens.rs @@ -1,13 +1,15 @@ -use domain::{errors::DomainError, models::WebhookToken, value_objects::UserId}; +use std::sync::Arc; -use crate::{context::AppContext, integrations::queries::GetWebhookTokensQuery}; +use domain::{errors::DomainError, models::WebhookToken, ports::WebhookTokenRepository, value_objects::UserId}; + +use crate::integrations::queries::GetWebhookTokensQuery; pub async fn execute( - ctx: &AppContext, + webhook_token: Arc, query: GetWebhookTokensQuery, ) -> Result, DomainError> { let user_id = UserId::from_uuid(query.user_id); - ctx.repos.webhook_token.list_by_user(&user_id).await + webhook_token.list_by_user(&user_id).await } #[cfg(test)] diff --git a/crates/application/src/integrations/ingest.rs b/crates/application/src/integrations/ingest.rs index fc1a1fd..5f69fc0 100644 --- a/crates/application/src/integrations/ingest.rs +++ b/crates/application/src/integrations/ingest.rs @@ -3,23 +3,21 @@ use domain::{ errors::DomainError, events::DomainEvent, models::WatchEvent, ports::MediaServerParser, }; -use crate::{context::AppContext, integrations::commands::IngestWatchEventCommand}; +use crate::integrations::{commands::IngestWatchEventCommand, deps::IngestWatchEventDeps}; pub async fn execute( - ctx: &AppContext, + deps: &IngestWatchEventDeps, cmd: IngestWatchEventCommand, parser: &dyn MediaServerParser, ) -> Result<(), DomainError> { let token_hash = super::generate_token::hash_token(&cmd.token); - let webhook_token = ctx - .repos + let webhook_token = deps .webhook_token .find_by_token_hash(&token_hash) .await? .ok_or_else(|| DomainError::Unauthorized("invalid webhook token".into()))?; - let _ = ctx - .repos + let _ = deps .webhook_token .touch_last_used(webhook_token.id()) .await; @@ -34,8 +32,7 @@ pub async fn execute( if let Some(ref ext_id) = external_metadata_id { let one_hour_ago = chrono::Utc::now().naive_utc() - Duration::hours(1); - if ctx - .repos + if deps .watch_event .find_duplicate(&user_id, ext_id, one_hour_ago) .await? @@ -55,10 +52,9 @@ pub async fn execute( None, ); - ctx.repos.watch_event.save(&event).await?; + deps.watch_event.save(&event).await?; - let _ = ctx - .services + let _ = deps .event_publisher .publish(&DomainEvent::WatchEventIngested { user_id: event.user_id().clone(), diff --git a/crates/application/src/integrations/mod.rs b/crates/application/src/integrations/mod.rs index 2dbca51..fcfef23 100644 --- a/crates/application/src/integrations/mod.rs +++ b/crates/application/src/integrations/mod.rs @@ -1,6 +1,7 @@ pub mod cleanup; pub mod commands; pub mod confirm; +pub mod deps; pub mod dismiss; pub mod generate_token; pub mod get_queue; diff --git a/crates/application/src/integrations/revoke_token.rs b/crates/application/src/integrations/revoke_token.rs index 23ed200..b245adf 100644 --- a/crates/application/src/integrations/revoke_token.rs +++ b/crates/application/src/integrations/revoke_token.rs @@ -1,14 +1,20 @@ +use std::sync::Arc; + use domain::{ errors::DomainError, + ports::WebhookTokenRepository, value_objects::{UserId, WebhookTokenId}, }; -use crate::{context::AppContext, integrations::commands::RevokeWebhookTokenCommand}; +use crate::integrations::commands::RevokeWebhookTokenCommand; -pub async fn execute(ctx: &AppContext, cmd: RevokeWebhookTokenCommand) -> Result<(), DomainError> { +pub async fn execute( + webhook_token: Arc, + cmd: RevokeWebhookTokenCommand, +) -> Result<(), DomainError> { let user_id = UserId::from_uuid(cmd.user_id); let token_id = WebhookTokenId::from_uuid(cmd.token_id); - ctx.repos.webhook_token.delete(&token_id, &user_id).await + webhook_token.delete(&token_id, &user_id).await } #[cfg(test)] diff --git a/crates/application/src/integrations/tests/cleanup.rs b/crates/application/src/integrations/tests/cleanup.rs index d4f5cf2..bd1ec38 100644 --- a/crates/application/src/integrations/tests/cleanup.rs +++ b/crates/application/src/integrations/tests/cleanup.rs @@ -1,11 +1,15 @@ +use std::sync::Arc; + +use domain::ports::WatchEventRepository; +use domain::testing::InMemoryWatchEventRepository; + use crate::integrations::cleanup; -use crate::test_helpers::TestContextBuilder; #[tokio::test] async fn returns_zero_when_nothing_to_clean() { - let ctx = TestContextBuilder::new().build(); + let watch_events: Arc = InMemoryWatchEventRepository::new(); - let count = cleanup::execute(&ctx).await.unwrap(); + let count = cleanup::execute(watch_events).await.unwrap(); assert_eq!(count, 0); } diff --git a/crates/application/src/integrations/tests/confirm.rs b/crates/application/src/integrations/tests/confirm.rs index d075323..a217740 100644 --- a/crates/application/src/integrations/tests/confirm.rs +++ b/crates/application/src/integrations/tests/confirm.rs @@ -8,12 +8,15 @@ use uuid::Uuid; use crate::integrations::commands::{ConfirmWatchEventsCommand, WatchEventConfirmation}; use crate::integrations::confirm; -use crate::test_helpers::TestContextBuilder; +use crate::test_helpers::NoopReviewLogger; + +fn noop_logger() -> Arc { + Arc::new(NoopReviewLogger) +} #[tokio::test] async fn confirms_watch_event_via_review_logger() { - let watch_events = InMemoryWatchEventRepository::new(); - let events = NoopEventPublisher::new(); + let watch_events: Arc = InMemoryWatchEventRepository::new(); let uid = Uuid::new_v4(); let event = WatchEvent::new( @@ -28,13 +31,9 @@ async fn confirms_watch_event_via_review_logger() { let event_id = event.id().value(); watch_events.save(&event).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_watch_events(Arc::clone(&watch_events) as _) - .with_event_publisher(Arc::clone(&events) as _) - .build(); - let result = confirm::execute( - &ctx, + Arc::clone(&watch_events), + noop_logger(), ConfirmWatchEventsCommand { user_id: uid, confirmations: vec![WatchEventConfirmation { @@ -52,10 +51,11 @@ async fn confirms_watch_event_via_review_logger() { #[tokio::test] async fn empty_confirmations_returns_zero() { - let ctx = TestContextBuilder::new().build(); + let watch_events: Arc = InMemoryWatchEventRepository::new(); let result = confirm::execute( - &ctx, + Arc::clone(&watch_events), + noop_logger(), ConfirmWatchEventsCommand { user_id: Uuid::new_v4(), confirmations: vec![], @@ -69,8 +69,7 @@ async fn empty_confirmations_returns_zero() { #[tokio::test] async fn confirms_event_with_external_metadata_id_and_no_movie_id() { - let watch_events = InMemoryWatchEventRepository::new(); - let events = NoopEventPublisher::new(); + let watch_events: Arc = InMemoryWatchEventRepository::new(); let uid = Uuid::new_v4(); let event = WatchEvent::new( @@ -85,13 +84,9 @@ async fn confirms_event_with_external_metadata_id_and_no_movie_id() { let event_id = event.id().value(); watch_events.save(&event).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_watch_events(Arc::clone(&watch_events) as _) - .with_event_publisher(Arc::clone(&events) as _) - .build(); - let result = confirm::execute( - &ctx, + Arc::clone(&watch_events), + noop_logger(), ConfirmWatchEventsCommand { user_id: uid, confirmations: vec![WatchEventConfirmation { @@ -109,7 +104,7 @@ async fn confirms_event_with_external_metadata_id_and_no_movie_id() { #[tokio::test] async fn rejects_other_users_event() { - let watch_events = InMemoryWatchEventRepository::new(); + let watch_events: Arc = InMemoryWatchEventRepository::new(); let owner = Uuid::new_v4(); let intruder = Uuid::new_v4(); @@ -125,12 +120,9 @@ async fn rejects_other_users_event() { let event_id = event.id().value(); watch_events.save(&event).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_watch_events(Arc::clone(&watch_events) as _) - .build(); - let result = confirm::execute( - &ctx, + Arc::clone(&watch_events), + noop_logger(), ConfirmWatchEventsCommand { user_id: intruder, confirmations: vec![WatchEventConfirmation { @@ -147,10 +139,11 @@ async fn rejects_other_users_event() { #[tokio::test] async fn fails_when_event_not_found() { - let ctx = TestContextBuilder::new().build(); + let watch_events: Arc = InMemoryWatchEventRepository::new(); let result = confirm::execute( - &ctx, + Arc::clone(&watch_events), + noop_logger(), ConfirmWatchEventsCommand { user_id: Uuid::new_v4(), confirmations: vec![WatchEventConfirmation { @@ -167,7 +160,7 @@ async fn fails_when_event_not_found() { #[tokio::test] async fn confirms_event_with_movie_id() { - let watch_events = InMemoryWatchEventRepository::new(); + let watch_events: Arc = InMemoryWatchEventRepository::new(); let events = NoopEventPublisher::new(); let uid = Uuid::new_v4(); let movie_uuid = Uuid::new_v4(); @@ -199,23 +192,18 @@ async fn confirms_event_with_movie_id() { // Build a real review logger let reviews = domain::testing::InMemoryReviewRepository::new(); let watchlist = domain::testing::InMemoryWatchlistRepository::new(); - let review_logger = std::sync::Arc::new(crate::diary::review_logger::DefaultReviewLogger::new( - std::sync::Arc::clone(&movies) as _, - std::sync::Arc::clone(&reviews) as _, - std::sync::Arc::clone(&watchlist) as _, - std::sync::Arc::new(domain::testing::FakeMetadataClient) as _, - std::sync::Arc::clone(&events) as _, - )); - - let ctx = TestContextBuilder::new() - .with_watch_events(std::sync::Arc::clone(&watch_events) as _) - .with_event_publisher(std::sync::Arc::clone(&events) as _) - .with_movies(std::sync::Arc::clone(&movies) as _) - .with_review_logger(review_logger as _) - .build(); + let review_logger: Arc = + Arc::new(crate::diary::review_logger::DefaultReviewLogger::new( + Arc::clone(&movies) as _, + Arc::clone(&reviews) as _, + Arc::clone(&watchlist) as _, + Arc::new(domain::testing::FakeMetadataClient) as _, + Arc::clone(&events) as _, + )); let result = confirm::execute( - &ctx, + Arc::clone(&watch_events), + review_logger, ConfirmWatchEventsCommand { user_id: uid, confirmations: vec![WatchEventConfirmation { @@ -233,8 +221,7 @@ async fn confirms_event_with_movie_id() { #[tokio::test] async fn confirms_event_without_movie_id_and_without_external_metadata_id() { - let watch_events = InMemoryWatchEventRepository::new(); - let events = NoopEventPublisher::new(); + let watch_events: Arc = InMemoryWatchEventRepository::new(); let uid = Uuid::new_v4(); let event = WatchEvent::new( @@ -249,13 +236,9 @@ async fn confirms_event_without_movie_id_and_without_external_metadata_id() { let event_id = event.id().value(); watch_events.save(&event).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_watch_events(Arc::clone(&watch_events) as _) - .with_event_publisher(Arc::clone(&events) as _) - .build(); - let result = confirm::execute( - &ctx, + Arc::clone(&watch_events), + noop_logger(), ConfirmWatchEventsCommand { user_id: uid, confirmations: vec![WatchEventConfirmation { @@ -273,8 +256,7 @@ async fn confirms_event_without_movie_id_and_without_external_metadata_id() { #[tokio::test] async fn confirms_multiple_events() { - let watch_events = InMemoryWatchEventRepository::new(); - let events = NoopEventPublisher::new(); + let watch_events: Arc = InMemoryWatchEventRepository::new(); let uid = Uuid::new_v4(); let event1 = WatchEvent::new( @@ -302,13 +284,9 @@ async fn confirms_multiple_events() { watch_events.save(&event1).await.unwrap(); watch_events.save(&event2).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_watch_events(Arc::clone(&watch_events) as _) - .with_event_publisher(Arc::clone(&events) as _) - .build(); - let result = confirm::execute( - &ctx, + Arc::clone(&watch_events), + noop_logger(), ConfirmWatchEventsCommand { user_id: uid, confirmations: vec![ @@ -333,14 +311,13 @@ async fn confirms_multiple_events() { #[tokio::test] async fn confirms_event_without_year() { - let watch_events = InMemoryWatchEventRepository::new(); - let events = NoopEventPublisher::new(); + let watch_events: Arc = InMemoryWatchEventRepository::new(); let uid = Uuid::new_v4(); let event = WatchEvent::new( UserId::from_uuid(uid), "No Year Movie".into(), - None, // no year + None, None, WatchEventSource::Jellyfin, chrono::Utc::now().naive_utc(), @@ -349,13 +326,9 @@ async fn confirms_event_without_year() { let event_id = event.id().value(); watch_events.save(&event).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_watch_events(Arc::clone(&watch_events) as _) - .with_event_publisher(Arc::clone(&events) as _) - .build(); - let result = confirm::execute( - &ctx, + Arc::clone(&watch_events), + noop_logger(), ConfirmWatchEventsCommand { user_id: uid, confirmations: vec![WatchEventConfirmation { diff --git a/crates/application/src/integrations/tests/dismiss.rs b/crates/application/src/integrations/tests/dismiss.rs index da9c87b..0faf5b0 100644 --- a/crates/application/src/integrations/tests/dismiss.rs +++ b/crates/application/src/integrations/tests/dismiss.rs @@ -7,17 +7,13 @@ use domain::value_objects::UserId; use uuid::Uuid; use crate::integrations::{commands::DismissWatchEventsCommand, dismiss}; -use crate::test_helpers::TestContextBuilder; #[tokio::test] async fn dismisses_empty_list_returns_zero() { - let events = InMemoryWatchEventRepository::new(); - let ctx = TestContextBuilder::new() - .with_watch_events(Arc::clone(&events) as _) - .build(); + let events: Arc = InMemoryWatchEventRepository::new(); let result = dismiss::execute( - &ctx, + Arc::clone(&events), DismissWatchEventsCommand { user_id: Uuid::new_v4(), event_ids: vec![], @@ -31,13 +27,10 @@ async fn dismisses_empty_list_returns_zero() { #[tokio::test] async fn fails_when_event_not_found() { - let events = InMemoryWatchEventRepository::new(); - let ctx = TestContextBuilder::new() - .with_watch_events(Arc::clone(&events) as _) - .build(); + let events: Arc = InMemoryWatchEventRepository::new(); let result = dismiss::execute( - &ctx, + Arc::clone(&events), DismissWatchEventsCommand { user_id: Uuid::new_v4(), event_ids: vec![Uuid::new_v4()], @@ -50,7 +43,7 @@ async fn fails_when_event_not_found() { #[tokio::test] async fn dismisses_existing_events() { - let watch_events = InMemoryWatchEventRepository::new(); + let watch_events: Arc = InMemoryWatchEventRepository::new(); let uid = Uuid::new_v4(); let user_id = UserId::from_uuid(uid); @@ -77,12 +70,8 @@ async fn dismisses_existing_events() { watch_events.save(&e1).await.unwrap(); watch_events.save(&e2).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_watch_events(Arc::clone(&watch_events) as _) - .build(); - let result = dismiss::execute( - &ctx, + Arc::clone(&watch_events), DismissWatchEventsCommand { user_id: uid, event_ids: vec![id1, id2], diff --git a/crates/application/src/integrations/tests/generate_token.rs b/crates/application/src/integrations/tests/generate_token.rs index b2b02d9..8127cb7 100644 --- a/crates/application/src/integrations/tests/generate_token.rs +++ b/crates/application/src/integrations/tests/generate_token.rs @@ -1,22 +1,19 @@ use std::sync::Arc; use domain::models::WatchEventSource; +use domain::ports::WebhookTokenRepository; use domain::testing::InMemoryWebhookTokenRepository; use uuid::Uuid; use crate::integrations::{commands::GenerateWebhookTokenCommand, generate_token}; -use crate::test_helpers::TestContextBuilder; #[tokio::test] async fn generates_token_and_saves() { - let tokens = InMemoryWebhookTokenRepository::new(); - let ctx = TestContextBuilder::new() - .with_webhook_tokens(Arc::clone(&tokens) as _) - .build(); + let tokens: Arc = InMemoryWebhookTokenRepository::new(); let user_id = Uuid::new_v4(); let result = generate_token::execute( - &ctx, + Arc::clone(&tokens), GenerateWebhookTokenCommand { user_id, provider: WatchEventSource::Jellyfin, @@ -28,9 +25,7 @@ async fn generates_token_and_saves() { assert!(!result.token_plaintext.is_empty()); - let saved = ctx - .repos - .webhook_token + let saved = tokens .list_by_user(&domain::value_objects::UserId::from_uuid(user_id)) .await .unwrap(); diff --git a/crates/application/src/integrations/tests/get_queue.rs b/crates/application/src/integrations/tests/get_queue.rs index 302b233..e73b6e9 100644 --- a/crates/application/src/integrations/tests/get_queue.rs +++ b/crates/application/src/integrations/tests/get_queue.rs @@ -8,17 +8,13 @@ use domain::value_objects::UserId; use uuid::Uuid; use crate::integrations::{get_queue, queries::GetWatchQueueQuery}; -use crate::test_helpers::TestContextBuilder; #[tokio::test] async fn returns_empty_when_no_events() { - let events = InMemoryWatchEventRepository::new(); - let ctx = TestContextBuilder::new() - .with_watch_events(Arc::clone(&events) as _) - .build(); + let events: Arc = InMemoryWatchEventRepository::new(); let result = get_queue::execute( - &ctx, + Arc::clone(&events), GetWatchQueueQuery { user_id: Uuid::new_v4(), }, @@ -31,10 +27,7 @@ async fn returns_empty_when_no_events() { #[tokio::test] async fn returns_pending_events() { - let events = InMemoryWatchEventRepository::new(); - let ctx = TestContextBuilder::new() - .with_watch_events(Arc::clone(&events) as _) - .build(); + let events: Arc = InMemoryWatchEventRepository::new(); let user_id = Uuid::new_v4(); let event = WatchEvent::new( @@ -48,7 +41,7 @@ async fn returns_pending_events() { ); events.save(&event).await.unwrap(); - let result = get_queue::execute(&ctx, GetWatchQueueQuery { user_id }) + let result = get_queue::execute(Arc::clone(&events), GetWatchQueueQuery { user_id }) .await .unwrap(); diff --git a/crates/application/src/integrations/tests/get_tokens.rs b/crates/application/src/integrations/tests/get_tokens.rs index 81d13ec..05bb0c3 100644 --- a/crates/application/src/integrations/tests/get_tokens.rs +++ b/crates/application/src/integrations/tests/get_tokens.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use domain::models::WatchEventSource; +use domain::ports::WebhookTokenRepository; use domain::testing::InMemoryWebhookTokenRepository; use uuid::Uuid; @@ -8,17 +9,13 @@ use crate::integrations::{ commands::GenerateWebhookTokenCommand, generate_token, get_tokens, queries::GetWebhookTokensQuery, }; -use crate::test_helpers::TestContextBuilder; #[tokio::test] async fn returns_empty_when_no_tokens() { - let tokens = InMemoryWebhookTokenRepository::new(); - let ctx = TestContextBuilder::new() - .with_webhook_tokens(Arc::clone(&tokens) as _) - .build(); + let tokens: Arc = InMemoryWebhookTokenRepository::new(); let result = get_tokens::execute( - &ctx, + Arc::clone(&tokens), GetWebhookTokensQuery { user_id: Uuid::new_v4(), }, @@ -31,15 +28,12 @@ async fn returns_empty_when_no_tokens() { #[tokio::test] async fn returns_tokens_after_generate() { - let tokens = InMemoryWebhookTokenRepository::new(); - let ctx = TestContextBuilder::new() - .with_webhook_tokens(Arc::clone(&tokens) as _) - .build(); + let tokens: Arc = InMemoryWebhookTokenRepository::new(); let user_id = Uuid::new_v4(); generate_token::execute( - &ctx, + Arc::clone(&tokens), GenerateWebhookTokenCommand { user_id, provider: WatchEventSource::Jellyfin, @@ -50,7 +44,7 @@ async fn returns_tokens_after_generate() { .unwrap(); generate_token::execute( - &ctx, + Arc::clone(&tokens), GenerateWebhookTokenCommand { user_id, provider: WatchEventSource::Plex, @@ -60,7 +54,7 @@ async fn returns_tokens_after_generate() { .await .unwrap(); - let result = get_tokens::execute(&ctx, GetWebhookTokensQuery { user_id }) + let result = get_tokens::execute(Arc::clone(&tokens), GetWebhookTokensQuery { user_id }) .await .unwrap(); diff --git a/crates/application/src/integrations/tests/ingest.rs b/crates/application/src/integrations/tests/ingest.rs index 66eb49a..68d165a 100644 --- a/crates/application/src/integrations/tests/ingest.rs +++ b/crates/application/src/integrations/tests/ingest.rs @@ -1,12 +1,13 @@ use std::sync::Arc; use domain::models::WatchEventSource; -use domain::testing::InMemoryWebhookTokenRepository; +use domain::ports::{EventPublisher, WatchEventRepository, WebhookTokenRepository}; +use domain::testing::{InMemoryWebhookTokenRepository, InMemoryWatchEventRepository, NoopEventPublisher}; use uuid::Uuid; -use crate::integrations::commands::GenerateWebhookTokenCommand; -use crate::integrations::{commands::IngestWatchEventCommand, generate_token, ingest}; -use crate::test_helpers::TestContextBuilder; +use crate::integrations::commands::{GenerateWebhookTokenCommand, IngestWatchEventCommand}; +use crate::integrations::deps::IngestWatchEventDeps; +use crate::integrations::{generate_token, ingest}; struct FakeParser; @@ -26,14 +27,13 @@ impl domain::ports::MediaServerParser for FakeParser { #[tokio::test] async fn ingests_watch_event() { - let tokens = InMemoryWebhookTokenRepository::new(); - let ctx = TestContextBuilder::new() - .with_webhook_tokens(Arc::clone(&tokens) as _) - .build(); + let tokens: Arc = InMemoryWebhookTokenRepository::new(); + let watch_events: Arc = InMemoryWatchEventRepository::new(); + let event_publisher: Arc = NoopEventPublisher::new(); let user_id = Uuid::new_v4(); let generated = generate_token::execute( - &ctx, + Arc::clone(&tokens), GenerateWebhookTokenCommand { user_id, provider: WatchEventSource::Jellyfin, @@ -43,8 +43,14 @@ async fn ingests_watch_event() { .await .unwrap(); + let deps = IngestWatchEventDeps { + webhook_token: Arc::clone(&tokens), + watch_event: Arc::clone(&watch_events), + event_publisher: Arc::clone(&event_publisher), + }; + let result = ingest::execute( - &ctx, + &deps, IngestWatchEventCommand { token: generated.token_plaintext, raw_payload: vec![], @@ -59,10 +65,18 @@ async fn ingests_watch_event() { #[tokio::test] async fn rejects_invalid_token() { - let ctx = TestContextBuilder::new().build(); + let tokens: Arc = InMemoryWebhookTokenRepository::new(); + let watch_events: Arc = InMemoryWatchEventRepository::new(); + let event_publisher: Arc = NoopEventPublisher::new(); + + let deps = IngestWatchEventDeps { + webhook_token: Arc::clone(&tokens), + watch_event: Arc::clone(&watch_events), + event_publisher: Arc::clone(&event_publisher), + }; let result = ingest::execute( - &ctx, + &deps, IngestWatchEventCommand { token: "bad-token".into(), raw_payload: vec![], diff --git a/crates/application/src/integrations/tests/revoke_token.rs b/crates/application/src/integrations/tests/revoke_token.rs index 9cfbc5d..b8136c3 100644 --- a/crates/application/src/integrations/tests/revoke_token.rs +++ b/crates/application/src/integrations/tests/revoke_token.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use domain::models::WatchEventSource; +use domain::ports::WebhookTokenRepository; use domain::testing::InMemoryWebhookTokenRepository; use uuid::Uuid; @@ -10,19 +11,15 @@ use crate::integrations::{ queries::GetWebhookTokensQuery, revoke_token, }; -use crate::test_helpers::TestContextBuilder; #[tokio::test] async fn revokes_existing_token() { - let tokens = InMemoryWebhookTokenRepository::new(); - let ctx = TestContextBuilder::new() - .with_webhook_tokens(Arc::clone(&tokens) as _) - .build(); + let tokens: Arc = InMemoryWebhookTokenRepository::new(); let user_id = Uuid::new_v4(); let generated = generate_token::execute( - &ctx, + Arc::clone(&tokens), GenerateWebhookTokenCommand { user_id, provider: WatchEventSource::Jellyfin, @@ -34,11 +31,14 @@ async fn revokes_existing_token() { let token_id = generated.token.id().value(); - revoke_token::execute(&ctx, RevokeWebhookTokenCommand { user_id, token_id }) - .await - .unwrap(); + revoke_token::execute( + Arc::clone(&tokens), + RevokeWebhookTokenCommand { user_id, token_id }, + ) + .await + .unwrap(); - let remaining = get_tokens::execute(&ctx, GetWebhookTokensQuery { user_id }) + let remaining = get_tokens::execute(Arc::clone(&tokens), GetWebhookTokensQuery { user_id }) .await .unwrap(); diff --git a/crates/application/src/jobs/watch_event_cleanup.rs b/crates/application/src/jobs/watch_event_cleanup.rs index a29aeb4..635c321 100644 --- a/crates/application/src/jobs/watch_event_cleanup.rs +++ b/crates/application/src/jobs/watch_event_cleanup.rs @@ -1,17 +1,16 @@ +use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use domain::{errors::DomainError, ports::PeriodicJob}; - -use crate::context::AppContext; +use domain::{errors::DomainError, ports::{PeriodicJob, WatchEventRepository}}; pub struct WatchEventCleanupJob { - ctx: AppContext, + watch_event: Arc, } impl WatchEventCleanupJob { - pub fn new(ctx: AppContext) -> Self { - Self { ctx } + pub fn new(watch_event: Arc) -> Self { + Self { watch_event } } } @@ -22,7 +21,7 @@ impl PeriodicJob for WatchEventCleanupJob { } async fn run(&self) -> Result<(), DomainError> { - let n = crate::integrations::cleanup::execute(&self.ctx).await?; + let n = crate::integrations::cleanup::execute(self.watch_event.clone()).await?; if n > 0 { tracing::info!("watch event cleanup: removed {n} old entries"); } diff --git a/crates/presentation/src/handlers/integrations.rs b/crates/presentation/src/handlers/integrations.rs index 3adacbf..167ac55 100644 --- a/crates/presentation/src/handlers/integrations.rs +++ b/crates/presentation/src/handlers/integrations.rs @@ -46,7 +46,7 @@ pub async fn get_integrations_page( let query = GetWebhookTokensQuery { user_id: user_id.value(), }; - let tokens = get_webhook_tokens::execute(&state.app_ctx, query) + let tokens = get_webhook_tokens::execute(state.app_ctx.repos.webhook_token.clone(), query) .await .unwrap_or_default(); @@ -86,7 +86,7 @@ pub async fn post_generate_token( label: form.label.filter(|l| !l.trim().is_empty()), }; - match generate_webhook_token::execute(&state.app_ctx, cmd).await { + match generate_webhook_token::execute(state.app_ctx.repos.webhook_token.clone(), cmd).await { Ok(result) => { let encoded = percent_encoding::utf8_percent_encode( &result.token_plaintext, @@ -116,7 +116,7 @@ pub async fn post_revoke_token( user_id: user_id.value(), token_id, }; - if let Err(e) = revoke_webhook_token::execute(&state.app_ctx, cmd).await { + if let Err(e) = revoke_webhook_token::execute(state.app_ctx.repos.webhook_token.clone(), cmd).await { tracing::error!("revoke token failed: {:?}", e); } @@ -138,7 +138,7 @@ pub async fn get_watch_queue_page( let query = GetWatchQueueQuery { user_id: user_id.value(), }; - let events = get_watch_queue::execute(&state.app_ctx, query) + let events = get_watch_queue::execute(state.app_ctx.repos.watch_event.clone(), query) .await .unwrap_or_default(); @@ -175,7 +175,13 @@ pub async fn post_confirm_single( }], }; - match confirm_watch_events::execute(&state.app_ctx, cmd).await { + match confirm_watch_events::execute( + state.app_ctx.repos.watch_event.clone(), + state.app_ctx.services.review_logger.clone(), + cmd, + ) + .await + { Ok(_) => Redirect::to("/watch-queue").into_response(), Err(e) => { let msg = encode_error(&e.to_string()); @@ -200,7 +206,7 @@ pub async fn post_dismiss_single( event_ids: vec![event_id], }; - match dismiss_watch_events::execute(&state.app_ctx, cmd).await { + match dismiss_watch_events::execute(state.app_ctx.repos.watch_event.clone(), cmd).await { Ok(_) => Redirect::to("/watch-queue").into_response(), Err(e) => { let msg = encode_error(&e.to_string()); diff --git a/crates/presentation/src/handlers/webhook.rs b/crates/presentation/src/handlers/webhook.rs index 58c2a58..2581545 100644 --- a/crates/presentation/src/handlers/webhook.rs +++ b/crates/presentation/src/handlers/webhook.rs @@ -15,10 +15,10 @@ use application::integrations::{ ConfirmWatchEventsCommand, DismissWatchEventsCommand, GenerateWebhookTokenCommand, IngestWatchEventCommand, RevokeWebhookTokenCommand, WatchEventConfirmation, }, - confirm as confirm_watch_events, dismiss as dismiss_watch_events, - generate_token as generate_webhook_token, get_queue as get_watch_queue, - get_tokens as get_webhook_tokens, ingest as ingest_watch_event, - queries::{GetWatchQueueQuery, GetWebhookTokensQuery}, + confirm as confirm_watch_events, deps::IngestWatchEventDeps, + dismiss as dismiss_watch_events, generate_token as generate_webhook_token, + get_queue as get_watch_queue, get_tokens as get_webhook_tokens, + ingest as ingest_watch_event, queries::{GetWatchQueueQuery, GetWebhookTokensQuery}, revoke_token as revoke_webhook_token, }; use domain::models::WatchEventSource; @@ -126,7 +126,12 @@ async fn run_ingest( cmd: IngestWatchEventCommand, parser: &dyn domain::ports::MediaServerParser, ) -> StatusCode { - match ingest_watch_event::execute(&state.app_ctx, cmd, parser).await { + let deps = IngestWatchEventDeps { + webhook_token: state.app_ctx.repos.webhook_token.clone(), + watch_event: state.app_ctx.repos.watch_event.clone(), + event_publisher: state.app_ctx.services.event_publisher.clone(), + }; + match ingest_watch_event::execute(&deps, cmd, parser).await { Ok(()) => StatusCode::OK, Err(e) => crate::errors::domain_error_status(&e), } @@ -159,7 +164,7 @@ pub async fn post_generate_webhook_token( label: req.label, }; - let result = generate_webhook_token::execute(&state.app_ctx, cmd).await?; + let result = generate_webhook_token::execute(state.app_ctx.repos.webhook_token.clone(), cmd).await?; let base_url = &state.app_ctx.config.base_url; let webhook_url = format!("{base_url}/api/v1/webhooks/{provider}"); @@ -186,7 +191,7 @@ pub async fn get_webhook_tokens( let query = GetWebhookTokensQuery { user_id: user.0.value(), }; - let tokens = get_webhook_tokens::execute(&state.app_ctx, query).await?; + let tokens = get_webhook_tokens::execute(state.app_ctx.repos.webhook_token.clone(), query).await?; let dtos = tokens .into_iter() @@ -221,7 +226,7 @@ pub async fn delete_webhook_token( user_id: user.0.value(), token_id: id, }; - revoke_webhook_token::execute(&state.app_ctx, cmd).await?; + revoke_webhook_token::execute(state.app_ctx.repos.webhook_token.clone(), cmd).await?; Ok(StatusCode::NO_CONTENT) } @@ -242,7 +247,7 @@ pub async fn get_watch_queue( let query = GetWatchQueueQuery { user_id: user.0.value(), }; - let events = get_watch_queue::execute(&state.app_ctx, query).await?; + let events = get_watch_queue::execute(state.app_ctx.repos.watch_event.clone(), query).await?; let dtos = events .into_iter() @@ -287,7 +292,12 @@ pub async fn post_confirm_watch_events( .collect(), }; - let confirmed = confirm_watch_events::execute(&state.app_ctx, cmd).await?; + let confirmed = confirm_watch_events::execute( + state.app_ctx.repos.watch_event.clone(), + state.app_ctx.services.review_logger.clone(), + cmd, + ) + .await?; Ok(Json(ConfirmWatchResponse { confirmed })) } @@ -311,6 +321,6 @@ pub async fn post_dismiss_watch_events( event_ids: req.event_ids, }; - let dismissed = dismiss_watch_events::execute(&state.app_ctx, cmd).await?; + let dismissed = dismiss_watch_events::execute(state.app_ctx.repos.watch_event.clone(), cmd).await?; Ok(Json(DismissWatchResponse { dismissed })) } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index dcf8fa2..098c7bd 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -175,7 +175,7 @@ async fn main() -> anyhow::Result<()> { let mut periodic_jobs: Vec> = vec![ Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.repos.import_session.clone())), - Arc::new(application::jobs::WatchEventCleanupJob::new(ctx.clone())), + Arc::new(application::jobs::WatchEventCleanupJob::new(ctx.repos.watch_event.clone())), Arc::new(application::jobs::WrapUpAutoGenerateJob::new(ctx.clone())), Arc::new(application::jobs::WrapUpCleanupJob::new(ctx.clone())), Arc::new(application::jobs::RefreshSessionCleanupJob::new(