diff --git a/Cargo.lock b/Cargo.lock index c268fef..cbfe683 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -305,6 +305,7 @@ dependencies = [ "async-trait", "chrono", "domain", + "futures", "tokio", "tracing", "uuid", @@ -1516,6 +1517,7 @@ version = "0.1.0" dependencies = [ "async-trait", "domain", + "futures", "tokio", "tracing", ] @@ -5939,6 +5941,35 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "worker" +version = "0.1.0" +dependencies = [ + "anyhow", + "application", + "async-trait", + "auth", + "chrono", + "domain", + "dotenvy", + "event-publisher", + "export", + "futures", + "metadata", + "poster-fetcher", + "poster-storage", + "postgres", + "serde", + "serde_json", + "sqlite", + "sqlx", + "thiserror 2.0.18", + "tokio", + "tracing", + "tracing-subscriber", + "uuid", +] + [[package]] name = "writeable" version = "0.6.3" diff --git a/Cargo.toml b/Cargo.toml index 1532dbf..90a90db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "crates/presentation", "crates/tui", "crates/doc", + "crates/worker", ] resolver = "2" diff --git a/Dockerfile b/Dockerfile index 8d058cd..ae6bd78 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,6 +26,7 @@ COPY crates/domain/Cargo.toml crates/domain/Cargo.toml COPY crates/presentation/Cargo.toml crates/presentation/Cargo.toml COPY crates/doc/Cargo.toml crates/doc/Cargo.toml COPY crates/tui/Cargo.toml crates/tui/Cargo.toml +COPY crates/worker/Cargo.toml crates/worker/Cargo.toml # Stub every crate so cargo can resolve and fetch deps RUN find crates -name "Cargo.toml" | sed 's|/Cargo.toml||' | \ @@ -42,7 +43,7 @@ COPY crates ./crates # To build with PostgreSQL backend instead: # --build-arg FEATURES=postgres,postgres-federation ARG FEATURES=sqlite,sqlite-federation -RUN cargo build --release -p presentation --no-default-features --features "${FEATURES}" +RUN cargo build --release -p presentation -p worker --no-default-features --features "${FEATURES}" # ----- runtime ----- FROM debian:bookworm-slim @@ -54,6 +55,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ WORKDIR /app COPY --from=builder /build/target/release/presentation ./presentation +COPY --from=builder /build/target/release/worker ./worker COPY static ./static EXPOSE 3000 diff --git a/crates/adapters/event-publisher/Cargo.toml b/crates/adapters/event-publisher/Cargo.toml index d7044a8..bf60be1 100644 --- a/crates/adapters/event-publisher/Cargo.toml +++ b/crates/adapters/event-publisher/Cargo.toml @@ -8,3 +8,4 @@ domain = { workspace = true } async-trait = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +futures = { workspace = true } diff --git a/crates/adapters/event-publisher/src/lib.rs b/crates/adapters/event-publisher/src/lib.rs index 1b2b5b3..d38763b 100644 --- a/crates/adapters/event-publisher/src/lib.rs +++ b/crates/adapters/event-publisher/src/lib.rs @@ -1,5 +1,12 @@ use async_trait::async_trait; -use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; +use domain::{ + errors::DomainError, + events::{AckHandle, DomainEvent, EventEnvelope}, + ports::{EventConsumer, EventPublisher}, +}; +use futures::stream::{self, BoxStream}; +use std::sync::Arc; +use tokio::sync::Mutex; use tokio::sync::mpsc; pub use domain::ports::EventHandler; @@ -32,65 +39,26 @@ impl EventPublisher for ChannelEventPublisher { } } -pub struct EventWorker { - receiver: mpsc::Receiver, - handlers: Vec>, +struct NoopAck; + +#[async_trait] +impl AckHandle for NoopAck { + async fn ack(&self) -> Result<(), DomainError> { Ok(()) } + async fn nack(&self) -> Result<(), DomainError> { Ok(()) } } -impl EventWorker { - pub async fn run(mut self) { - while let Some(event) = self.receiver.recv().await { - match &event { - DomainEvent::ReviewLogged { - review_id, - movie_id, - user_id, - rating, - watched_at, - } => { - tracing::info!( - review_id = %review_id.value(), - movie_id = %movie_id.value(), - user_id = %user_id.value(), - rating = rating.value(), - watched_at = %watched_at, - "event: review_logged" - ); - } - DomainEvent::ReviewUpdated { - review_id, - movie_id, - user_id, - rating, - watched_at, - } => { - tracing::info!( - review_id = %review_id.value(), - movie_id = %movie_id.value(), - user_id = %user_id.value(), - rating = rating.value(), - watched_at = %watched_at, - "event: review_updated" - ); - } - DomainEvent::MovieDiscovered { - movie_id, - external_metadata_id, - } => { - tracing::info!( - movie_id = %movie_id.value(), - external_id = external_metadata_id.value(), - "event: movie_discovered" - ); - } - } - for handler in &self.handlers { - if let Err(e) = handler.handle(&event).await { - tracing::error!("event handler error: {e}"); - } - } - } - tracing::info!("event worker shut down"); +pub struct ChannelEventConsumer { + receiver: Arc>>, +} + +impl EventConsumer for ChannelEventConsumer { + fn consume(&self) -> BoxStream<'_, Result> { + let receiver = Arc::clone(&self.receiver); + Box::pin(stream::unfold(receiver, |rx| async move { + let event = rx.lock().await.recv().await?; + let envelope = EventEnvelope::new(event, Box::new(NoopAck)); + Some((Ok(envelope), rx)) + })) } } @@ -105,14 +73,12 @@ impl EventPublisher for NoopEventPublisher { pub fn create_event_channel( config: EventPublisherConfig, - handlers: Vec>, -) -> (ChannelEventPublisher, EventWorker) { +) -> (ChannelEventPublisher, ChannelEventConsumer) { let (tx, rx) = mpsc::channel(config.channel_buffer); ( ChannelEventPublisher { sender: tx }, - EventWorker { - receiver: rx, - handlers, + ChannelEventConsumer { + receiver: Arc::new(Mutex::new(rx)), }, ) } @@ -121,107 +87,56 @@ pub fn create_event_channel( mod tests { use super::*; use domain::{ - errors::DomainError, events::DomainEvent, value_objects::{ExternalMetadataId, MovieId}, }; - use std::sync::{Arc, Mutex}; + use futures::StreamExt; - struct RecordingHandler { - calls: Arc>>, - } - - #[async_trait] - impl EventHandler for RecordingHandler { - async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { - let label = match event { - DomainEvent::ReviewLogged { .. } => "review_logged", - DomainEvent::ReviewUpdated { .. } => "review_updated", - DomainEvent::MovieDiscovered { .. } => "movie_discovered", - }; - self.calls.lock().unwrap().push(label.to_string()); - Ok(()) - } - } - - #[tokio::test] - async fn single_handler_receives_event() { - let calls = Arc::new(Mutex::new(vec![])); - let handler = RecordingHandler { - calls: Arc::clone(&calls), - }; - let config = EventPublisherConfig { channel_buffer: 8 }; - let (publisher, worker) = create_event_channel(config, vec![Box::new(handler)]); - - let handle = tokio::spawn(worker.run()); - - let event = DomainEvent::MovieDiscovered { + fn movie_discovered() -> DomainEvent { + DomainEvent::MovieDiscovered { movie_id: MovieId::generate(), external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(), - }; - publisher.publish(&event).await.unwrap(); - drop(publisher); - handle.await.unwrap(); - - assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]); - } - - #[tokio::test] - async fn multiple_handlers_all_receive_event() { - let calls1 = Arc::new(Mutex::new(vec![])); - let calls2 = Arc::new(Mutex::new(vec![])); - let handler1 = RecordingHandler { - calls: Arc::clone(&calls1), - }; - let handler2 = RecordingHandler { - calls: Arc::clone(&calls2), - }; - let config = EventPublisherConfig { channel_buffer: 8 }; - let (publisher, worker) = - create_event_channel(config, vec![Box::new(handler1), Box::new(handler2)]); - - let handle = tokio::spawn(worker.run()); - - let event = DomainEvent::MovieDiscovered { - movie_id: MovieId::generate(), - external_metadata_id: ExternalMetadataId::new("tt9999999".into()).unwrap(), - }; - publisher.publish(&event).await.unwrap(); - drop(publisher); - handle.await.unwrap(); - - assert_eq!(calls1.lock().unwrap().len(), 1); - assert_eq!(calls2.lock().unwrap().len(), 1); - } - - #[tokio::test] - async fn handler_error_does_not_stop_worker() { - struct FailingHandler; - #[async_trait] - impl EventHandler for FailingHandler { - async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> { - Err(DomainError::InfrastructureError("boom".into())) - } } + } - let calls = Arc::new(Mutex::new(vec![])); - let good = RecordingHandler { - calls: Arc::clone(&calls), - }; + #[tokio::test] + async fn consumer_yields_published_events() { let config = EventPublisherConfig { channel_buffer: 8 }; - let (publisher, worker) = - create_event_channel(config, vec![Box::new(FailingHandler), Box::new(good)]); + let (publisher, consumer) = create_event_channel(config); - let handle = tokio::spawn(worker.run()); - - let event = DomainEvent::MovieDiscovered { - movie_id: MovieId::generate(), - external_metadata_id: ExternalMetadataId::new("tt0000001".into()).unwrap(), - }; - publisher.publish(&event).await.unwrap(); + publisher.publish(&movie_discovered()).await.unwrap(); drop(publisher); - handle.await.unwrap(); - assert_eq!(calls.lock().unwrap().len(), 1); + let mut stream = consumer.consume(); + let envelope = stream.next().await.unwrap().unwrap(); + assert!(matches!(envelope.event, DomainEvent::MovieDiscovered { .. })); + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn consumer_yields_multiple_events_in_order() { + let config = EventPublisherConfig { channel_buffer: 8 }; + let (publisher, consumer) = create_event_channel(config); + + publisher.publish(&movie_discovered()).await.unwrap(); + publisher.publish(&movie_discovered()).await.unwrap(); + drop(publisher); + + let mut stream = consumer.consume(); + let first = stream.next().await.unwrap().unwrap(); + let second = stream.next().await.unwrap().unwrap(); + assert!(matches!(first.event, DomainEvent::MovieDiscovered { .. })); + assert!(matches!(second.event, DomainEvent::MovieDiscovered { .. })); + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn stream_ends_when_publisher_dropped() { + let config = EventPublisherConfig { channel_buffer: 8 }; + let (publisher, consumer) = create_event_channel(config); + drop(publisher); + + let mut stream = consumer.consume(); + assert!(stream.next().await.is_none()); } } diff --git a/crates/application/Cargo.toml b/crates/application/Cargo.toml index 28ed626..cb58687 100644 --- a/crates/application/Cargo.toml +++ b/crates/application/Cargo.toml @@ -9,6 +9,8 @@ domain = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } tracing = { workspace = true } +futures = { workspace = true } +tokio = { workspace = true } [dev-dependencies] tokio = { workspace = true } diff --git a/crates/presentation/src/event_handlers.rs b/crates/application/src/event_handlers.rs similarity index 95% rename from crates/presentation/src/event_handlers.rs rename to crates/application/src/event_handlers.rs index 8692814..2720de4 100644 --- a/crates/presentation/src/event_handlers.rs +++ b/crates/application/src/event_handlers.rs @@ -1,10 +1,11 @@ use std::time::Duration; -use application::{commands::SyncPosterCommand, context::AppContext, use_cases::sync_poster}; use async_trait::async_trait; use domain::ports::EventHandler; use domain::{errors::DomainError, events::DomainEvent}; +use crate::{commands::SyncPosterCommand, context::AppContext, use_cases::sync_poster}; + pub struct PosterSyncHandler { ctx: AppContext, max_retries: u32, diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index 9aed4a0..2007e9c 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -1,4 +1,6 @@ pub mod commands; +pub mod event_handlers; +pub mod worker; pub mod config; pub mod context; pub mod movie_resolver; diff --git a/crates/application/src/worker.rs b/crates/application/src/worker.rs new file mode 100644 index 0000000..01e1927 --- /dev/null +++ b/crates/application/src/worker.rs @@ -0,0 +1,215 @@ +use std::sync::Arc; + +use domain::{ + events::EventEnvelope, + ports::{EventConsumer, EventHandler}, +}; +use futures::StreamExt; + +pub struct WorkerService { + consumer: Arc, + handlers: Vec>, +} + +impl WorkerService { + pub fn new(consumer: Arc, handlers: Vec>) -> Self { + Self { consumer, handlers } + } + + pub async fn run(self) { + let mut stream = self.consumer.consume(); + while let Some(result) = stream.next().await { + match result { + Ok(envelope) => self.dispatch(envelope).await, + Err(e) => tracing::error!("event consumer error: {e}"), + } + } + tracing::info!("event stream ended, worker shutting down"); + } + + async fn dispatch(&self, envelope: EventEnvelope) { + let mut all_ok = true; + for handler in &self.handlers { + if let Err(e) = handler.handle(&envelope.event).await { + tracing::error!("event handler error: {e}"); + all_ok = false; + } + } + let result = if all_ok { + envelope.ack().await + } else { + envelope.nack().await + }; + if let Err(e) = result { + tracing::error!("ack/nack failed: {e}"); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + use domain::{errors::DomainError, events::{AckHandle, DomainEvent}}; + use domain::value_objects::{ExternalMetadataId, MovieId}; + use futures::{stream, stream::BoxStream}; + use std::sync::{Arc, Mutex}; + + struct NoopAck; + + #[async_trait] + impl AckHandle for NoopAck { + async fn ack(&self) -> Result<(), DomainError> { Ok(()) } + async fn nack(&self) -> Result<(), DomainError> { Ok(()) } + } + + struct VecConsumer { + events: Vec, + } + + impl EventConsumer for VecConsumer { + fn consume(&self) -> BoxStream<'_, Result> { + let envelopes: Vec> = self + .events + .iter() + .cloned() + .map(|e| Ok(EventEnvelope::new(e, Box::new(NoopAck)))) + .collect(); + Box::pin(stream::iter(envelopes)) + } + } + + struct RecordingHandler { + calls: Arc>>, + } + + #[async_trait] + impl EventHandler for RecordingHandler { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + let label = match event { + DomainEvent::MovieDiscovered { .. } => "movie_discovered", + DomainEvent::ReviewLogged { .. } => "review_logged", + DomainEvent::ReviewUpdated { .. } => "review_updated", + }; + self.calls.lock().unwrap().push(label); + Ok(()) + } + } + + fn movie_discovered() -> DomainEvent { + DomainEvent::MovieDiscovered { + movie_id: MovieId::generate(), + external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(), + } + } + + #[tokio::test] + async fn dispatches_to_all_handlers() { + let calls = Arc::new(Mutex::new(vec![])); + let consumer = VecConsumer { events: vec![movie_discovered()] }; + let handler = RecordingHandler { calls: Arc::clone(&calls) }; + + WorkerService::new(Arc::new(consumer), vec![Arc::new(handler)]) + .run() + .await; + + assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]); + } + + #[tokio::test] + async fn nacks_when_handler_fails() { + let nack_called = Arc::new(Mutex::new(false)); + + struct TrackingAck { + nack_called: Arc>, + } + + #[async_trait] + impl AckHandle for TrackingAck { + async fn ack(&self) -> Result<(), DomainError> { Ok(()) } + async fn nack(&self) -> Result<(), DomainError> { + *self.nack_called.lock().unwrap() = true; + Ok(()) + } + } + + struct TrackingConsumer { + event: DomainEvent, + nack_called: Arc>, + } + + impl EventConsumer for TrackingConsumer { + fn consume(&self) -> BoxStream<'_, Result> { + let envelope = EventEnvelope::new( + self.event.clone(), + Box::new(TrackingAck { nack_called: Arc::clone(&self.nack_called) }), + ); + Box::pin(stream::iter(vec![Ok(envelope)])) + } + } + + struct FailingHandler; + + #[async_trait] + impl EventHandler for FailingHandler { + async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> { + Err(DomainError::InfrastructureError("boom".into())) + } + } + + let consumer = TrackingConsumer { + event: movie_discovered(), + nack_called: Arc::clone(&nack_called), + }; + + WorkerService::new(Arc::new(consumer), vec![Arc::new(FailingHandler)]) + .run() + .await; + + assert!(*nack_called.lock().unwrap()); + } + + #[tokio::test] + async fn acks_when_all_handlers_succeed() { + let ack_called = Arc::new(Mutex::new(false)); + + struct TrackingAck { + ack_called: Arc>, + } + + #[async_trait] + impl AckHandle for TrackingAck { + async fn ack(&self) -> Result<(), DomainError> { + *self.ack_called.lock().unwrap() = true; + Ok(()) + } + async fn nack(&self) -> Result<(), DomainError> { Ok(()) } + } + + struct TrackingConsumer { + event: DomainEvent, + ack_called: Arc>, + } + + impl EventConsumer for TrackingConsumer { + fn consume(&self) -> BoxStream<'_, Result> { + let envelope = EventEnvelope::new( + self.event.clone(), + Box::new(TrackingAck { ack_called: Arc::clone(&self.ack_called) }), + ); + Box::pin(stream::iter(vec![Ok(envelope)])) + } + } + + let consumer = TrackingConsumer { + event: movie_discovered(), + ack_called: Arc::clone(&ack_called), + }; + + WorkerService::new(Arc::new(consumer), vec![]) + .run() + .await; + + assert!(*ack_called.lock().unwrap()); + } +} diff --git a/crates/domain/src/events.rs b/crates/domain/src/events.rs index 48b0603..7ff5bf4 100644 --- a/crates/domain/src/events.rs +++ b/crates/domain/src/events.rs @@ -1,6 +1,10 @@ +use async_trait::async_trait; use chrono::NaiveDateTime; -use crate::value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId}; +use crate::{ + errors::DomainError, + value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId}, +}; #[derive(Clone, Debug)] pub enum DomainEvent { @@ -23,3 +27,28 @@ pub enum DomainEvent { external_metadata_id: ExternalMetadataId, }, } + +#[async_trait] +pub trait AckHandle: Send + Sync { + async fn ack(&self) -> Result<(), DomainError>; + async fn nack(&self) -> Result<(), DomainError>; +} + +pub struct EventEnvelope { + pub event: DomainEvent, + ack: Box, +} + +impl EventEnvelope { + pub fn new(event: DomainEvent, ack: Box) -> Self { + Self { event, ack } + } + + pub async fn ack(self) -> Result<(), DomainError> { + self.ack.ack().await + } + + pub async fn nack(self) -> Result<(), DomainError> { + self.ack.nack().await + } +} diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index eb00ab3..8782535 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -3,7 +3,7 @@ use chrono::{DateTime, Utc}; use crate::{ errors::DomainError, - events::DomainEvent, + events::{DomainEvent, EventEnvelope}, models::{ DiaryEntry, DiaryFilter, ExportFormat, FeedEntry, Movie, Review, ReviewHistory, User, UserStats, UserSummary, UserTrends, @@ -174,9 +174,10 @@ pub trait EventPublisher: Send + Sync { } pub trait EventConsumer: Send + Sync { - /// Returns a stream of domain events. Implementations decide whether this - /// is push-based (NATS) or poll-based (DB queue) — callers don't care. - fn consume(&self) -> futures::stream::BoxStream<'_, Result>; + /// Returns a stream of event envelopes. Each envelope carries a domain event + /// and an ack handle — callers ack after successful dispatch, nack on failure. + /// Implementations decide transport (NATS, DB queue, in-memory channel). + fn consume(&self) -> futures::stream::BoxStream<'_, Result>; } #[async_trait] diff --git a/crates/presentation/src/lib.rs b/crates/presentation/src/lib.rs index 1e00fbe..5be283f 100644 --- a/crates/presentation/src/lib.rs +++ b/crates/presentation/src/lib.rs @@ -1,7 +1,6 @@ pub mod csrf; pub mod dtos; pub mod errors; -pub mod event_handlers; pub mod extractors; pub mod handlers; pub mod openapi; diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index 4408a5d..9504581 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use anyhow::Context; use event_publisher::{EventPublisherConfig, NoopEventPublisher, create_event_channel}; -use presentation::event_handlers::PosterSyncHandler; +use application::event_handlers::PosterSyncHandler; use std::str::FromStr; use tokio::net::TcpListener; @@ -24,7 +24,7 @@ use activitypub::{ ReviewObjectHandler, }; -use application::{config::AppConfig, context::AppContext}; +use application::{config::AppConfig, context::AppContext, worker::WorkerService}; use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService}; use export::ExportAdapter; use metadata::MetadataClientImpl; @@ -184,12 +184,13 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { ); let ap_service_arc: Arc = concrete_ap_service; - let poster_handler = PosterSyncHandler::new(handler_ctx, 3); - let (event_publisher, event_worker) = create_event_channel( - EventPublisherConfig::from_env(), - vec![Box::new(poster_handler), Box::new(ap_event_handler)], + let poster_handler = Arc::new(PosterSyncHandler::new(handler_ctx, 3)); + let (event_publisher, consumer) = create_event_channel(EventPublisherConfig::from_env()); + let worker = WorkerService::new( + Arc::new(consumer), + vec![poster_handler, Arc::new(ap_event_handler)], ); - tokio::spawn(event_worker.run()); + tokio::spawn(worker.run()); let ep: Arc = Arc::new(event_publisher); (ep, ap_router, ap_service_arc, social_query_arc) @@ -197,12 +198,10 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { #[cfg(not(feature = "federation"))] let (event_publisher_arc, ap_router): (Arc, axum::Router) = { - let poster_handler = PosterSyncHandler::new(handler_ctx, 3); - let (event_publisher, event_worker) = create_event_channel( - EventPublisherConfig::from_env(), - vec![Box::new(poster_handler)], - ); - tokio::spawn(event_worker.run()); + let poster_handler = Arc::new(PosterSyncHandler::new(handler_ctx, 3)); + let (event_publisher, consumer) = create_event_channel(EventPublisherConfig::from_env()); + let worker = WorkerService::new(Arc::new(consumer), vec![poster_handler]); + tokio::spawn(worker.run()); (Arc::new(event_publisher), axum::Router::new()) }; diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml new file mode 100644 index 0000000..8d93b08 --- /dev/null +++ b/crates/worker/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "worker" +version = "0.1.0" +edition = "2024" + +[features] +default = ["sqlite"] +sqlite = ["dep:sqlite"] +postgres = ["dep:postgres"] + +[dependencies] +domain = { workspace = true } +application = { workspace = true } +event-publisher = { workspace = true } +tokio = { workspace = true } +anyhow = { workspace = true } +thiserror = { workspace = true } +chrono = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +futures = { workspace = true } +dotenvy = { workspace = true } +uuid = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +async-trait = { workspace = true } +auth = { workspace = true } +metadata = { workspace = true } +poster-fetcher = { workspace = true } +poster-storage = { workspace = true } +export = { workspace = true } +sqlx = { workspace = true } + +# Optional — database backends +sqlite = { workspace = true, optional = true } +postgres = { workspace = true, optional = true } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs new file mode 100644 index 0000000..37a2a17 --- /dev/null +++ b/crates/worker/src/main.rs @@ -0,0 +1,177 @@ +use std::sync::Arc; +use std::str::FromStr; + +use anyhow::Context; +use application::{config::AppConfig, context::AppContext, event_handlers::PosterSyncHandler, worker::WorkerService}; +use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService}; +use event_publisher::{EventPublisherConfig, create_event_channel}; +use export::ExportAdapter; +use metadata::MetadataClientImpl; +use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher}; +use poster_storage::{PosterStorageAdapter, StorageConfig}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +#[cfg(feature = "sqlite")] +use sqlite::{SqliteMovieRepository, SqliteUserRepository}; + +#[cfg(feature = "postgres")] +use postgres::{PostgresRepository, PostgresUserRepository}; + +use domain::ports::{ + AuthService, DiaryExporter, DiaryRepository, MetadataClient, MovieRepository, + PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository, + UserRepository, +}; + +#[cfg(not(any(feature = "sqlite", feature = "postgres")))] +compile_error!("At least one database backend must be enabled. Use --features sqlite or --features postgres"); + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + dotenvy::dotenv().ok(); + init_tracing(); + + let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?; + let backend = std::env::var("DATABASE_BACKEND").unwrap_or_else(|_| "sqlite".to_string()); + let auth_config = AuthConfig::from_env()?; + let storage_config = StorageConfig::from_env()?; + let app_config = AppConfig::from_env(); + + let metadata_client: Arc = + if let Ok(tmdb_key) = std::env::var("TMDB_API_KEY") { + Arc::new(MetadataClientImpl::new_tmdb(tmdb_key)) + } else { + let omdb_key = std::env::var("OMDB_API_KEY") + .context("Either TMDB_API_KEY or OMDB_API_KEY must be set")?; + Arc::new(MetadataClientImpl::new_omdb(omdb_key)) + }; + let poster_fetcher: Arc = + Arc::new(ReqwestPosterFetcher::new(PosterFetcherConfig::from_env())?); + let poster_storage: Arc = + Arc::new(PosterStorageAdapter::from_config(storage_config)); + let auth_service: Arc = Arc::new(JwtAuthService::new(auth_config)); + let password_hasher: Arc = Arc::new(Argon2PasswordHasher); + + let (movie_repository, review_repository, diary_repository, stats_repository, user_repository): + (Arc, Arc, Arc, + Arc, Arc) = + match backend.as_str() { + #[cfg(feature = "postgres")] + "postgres" => { + let (_, m, r, d, s, u) = wire_postgres(&database_url).await?; + (m, r, d, s, u) + } + #[cfg(feature = "sqlite")] + _ => { + let (_, m, r, d, s, u) = wire_sqlite(&database_url).await?; + (m, r, d, s, u) + } + #[cfg(not(feature = "sqlite"))] + _ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"), + }; + + let (event_publisher_arc, consumer) = { + let (publisher, consumer) = create_event_channel(EventPublisherConfig::from_env()); + (Arc::new(publisher) as Arc, consumer) + }; + + let ctx = AppContext { + movie_repository, + review_repository, + diary_repository, + diary_exporter: Arc::new(ExportAdapter) as Arc, + stats_repository, + metadata_client, + poster_fetcher, + poster_storage, + event_publisher: event_publisher_arc, + auth_service, + password_hasher, + user_repository, + config: app_config, + }; + + let poster_handler = Arc::new(PosterSyncHandler::new(ctx, 3)); + let worker = WorkerService::new(Arc::new(consumer), vec![poster_handler]); + + tracing::info!("worker started"); + worker.run().await; + tracing::info!("worker stopped"); + + Ok(()) +} + +fn init_tracing() { + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "info".into())) + .with(tracing_subscriber::fmt::layer()) + .init(); +} + +#[cfg(feature = "sqlite")] +async fn wire_sqlite(database_url: &str) -> anyhow::Result<( + sqlx::SqlitePool, + Arc, + Arc, + Arc, + Arc, + Arc, +)> { + use sqlx::sqlite::SqliteConnectOptions; + + let opts = SqliteConnectOptions::from_str(database_url) + .context("Invalid DATABASE_URL")? + .create_if_missing(true) + .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) + .busy_timeout(std::time::Duration::from_secs(5)); + let pool = sqlx::SqlitePool::connect_with(opts) + .await + .context("Failed to connect to SQLite database")?; + + let sqlite_repo = Arc::new(SqliteMovieRepository::new(pool.clone())); + sqlite_repo + .migrate() + .await + .map_err(|e| anyhow::anyhow!("{}", e)) + .context("Database migration failed")?; + + let movie_repository: Arc = Arc::clone(&sqlite_repo) as _; + let review_repository: Arc = Arc::clone(&sqlite_repo) as _; + let diary_repository: Arc = Arc::clone(&sqlite_repo) as _; + let stats_repository: Arc = Arc::clone(&sqlite_repo) as _; + let user_repository: Arc = + Arc::new(SqliteUserRepository::new(pool.clone())); + + Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository)) +} + +#[cfg(feature = "postgres")] +async fn wire_postgres(database_url: &str) -> anyhow::Result<( + sqlx::PgPool, + Arc, + Arc, + Arc, + Arc, + Arc, +)> { + let pool = sqlx::PgPool::connect(database_url) + .await + .context("Failed to connect to PostgreSQL database")?; + + let pg_repo = Arc::new(PostgresRepository::new(pool.clone())); + pg_repo + .migrate() + .await + .map_err(|e| anyhow::anyhow!("{}", e)) + .context("Database migration failed")?; + + let movie_repository: Arc = Arc::clone(&pg_repo) as _; + let review_repository: Arc = Arc::clone(&pg_repo) as _; + let diary_repository: Arc = Arc::clone(&pg_repo) as _; + let stats_repository: Arc = Arc::clone(&pg_repo) as _; + let user_repository: Arc = + Arc::new(PostgresUserRepository::new(pool.clone())); + + Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository)) +}