background worker
This commit is contained in:
31
Cargo.lock
generated
31
Cargo.lock
generated
@@ -305,6 +305,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"domain",
|
||||
"futures",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"uuid",
|
||||
@@ -1516,6 +1517,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"domain",
|
||||
"futures",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
@@ -5939,6 +5941,35 @@ dependencies = [
|
||||
"wasmparser",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "worker"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"application",
|
||||
"async-trait",
|
||||
"auth",
|
||||
"chrono",
|
||||
"domain",
|
||||
"dotenvy",
|
||||
"event-publisher",
|
||||
"export",
|
||||
"futures",
|
||||
"metadata",
|
||||
"poster-fetcher",
|
||||
"poster-storage",
|
||||
"postgres",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlite",
|
||||
"sqlx",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "writeable"
|
||||
version = "0.6.3"
|
||||
|
||||
@@ -19,6 +19,7 @@ members = [
|
||||
"crates/presentation",
|
||||
"crates/tui",
|
||||
"crates/doc",
|
||||
"crates/worker",
|
||||
]
|
||||
resolver = "2"
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ COPY crates/domain/Cargo.toml crates/domain/Cargo.toml
|
||||
COPY crates/presentation/Cargo.toml crates/presentation/Cargo.toml
|
||||
COPY crates/doc/Cargo.toml crates/doc/Cargo.toml
|
||||
COPY crates/tui/Cargo.toml crates/tui/Cargo.toml
|
||||
COPY crates/worker/Cargo.toml crates/worker/Cargo.toml
|
||||
|
||||
# Stub every crate so cargo can resolve and fetch deps
|
||||
RUN find crates -name "Cargo.toml" | sed 's|/Cargo.toml||' | \
|
||||
@@ -42,7 +43,7 @@ COPY crates ./crates
|
||||
# To build with PostgreSQL backend instead:
|
||||
# --build-arg FEATURES=postgres,postgres-federation
|
||||
ARG FEATURES=sqlite,sqlite-federation
|
||||
RUN cargo build --release -p presentation --no-default-features --features "${FEATURES}"
|
||||
RUN cargo build --release -p presentation -p worker --no-default-features --features "${FEATURES}"
|
||||
|
||||
# ----- runtime -----
|
||||
FROM debian:bookworm-slim
|
||||
@@ -54,6 +55,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
WORKDIR /app
|
||||
|
||||
COPY --from=builder /build/target/release/presentation ./presentation
|
||||
COPY --from=builder /build/target/release/worker ./worker
|
||||
COPY static ./static
|
||||
|
||||
EXPOSE 3000
|
||||
|
||||
@@ -8,3 +8,4 @@ domain = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
|
||||
@@ -1,5 +1,12 @@
|
||||
use async_trait::async_trait;
|
||||
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::{AckHandle, DomainEvent, EventEnvelope},
|
||||
ports::{EventConsumer, EventPublisher},
|
||||
};
|
||||
use futures::stream::{self, BoxStream};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub use domain::ports::EventHandler;
|
||||
@@ -32,65 +39,26 @@ impl EventPublisher for ChannelEventPublisher {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EventWorker {
|
||||
receiver: mpsc::Receiver<DomainEvent>,
|
||||
handlers: Vec<Box<dyn EventHandler>>,
|
||||
struct NoopAck;
|
||||
|
||||
#[async_trait]
|
||||
impl AckHandle for NoopAck {
|
||||
async fn ack(&self) -> Result<(), DomainError> { Ok(()) }
|
||||
async fn nack(&self) -> Result<(), DomainError> { Ok(()) }
|
||||
}
|
||||
|
||||
impl EventWorker {
|
||||
pub async fn run(mut self) {
|
||||
while let Some(event) = self.receiver.recv().await {
|
||||
match &event {
|
||||
DomainEvent::ReviewLogged {
|
||||
review_id,
|
||||
movie_id,
|
||||
user_id,
|
||||
rating,
|
||||
watched_at,
|
||||
} => {
|
||||
tracing::info!(
|
||||
review_id = %review_id.value(),
|
||||
movie_id = %movie_id.value(),
|
||||
user_id = %user_id.value(),
|
||||
rating = rating.value(),
|
||||
watched_at = %watched_at,
|
||||
"event: review_logged"
|
||||
);
|
||||
}
|
||||
DomainEvent::ReviewUpdated {
|
||||
review_id,
|
||||
movie_id,
|
||||
user_id,
|
||||
rating,
|
||||
watched_at,
|
||||
} => {
|
||||
tracing::info!(
|
||||
review_id = %review_id.value(),
|
||||
movie_id = %movie_id.value(),
|
||||
user_id = %user_id.value(),
|
||||
rating = rating.value(),
|
||||
watched_at = %watched_at,
|
||||
"event: review_updated"
|
||||
);
|
||||
}
|
||||
DomainEvent::MovieDiscovered {
|
||||
movie_id,
|
||||
external_metadata_id,
|
||||
} => {
|
||||
tracing::info!(
|
||||
movie_id = %movie_id.value(),
|
||||
external_id = external_metadata_id.value(),
|
||||
"event: movie_discovered"
|
||||
);
|
||||
}
|
||||
}
|
||||
for handler in &self.handlers {
|
||||
if let Err(e) = handler.handle(&event).await {
|
||||
tracing::error!("event handler error: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::info!("event worker shut down");
|
||||
pub struct ChannelEventConsumer {
|
||||
receiver: Arc<Mutex<mpsc::Receiver<DomainEvent>>>,
|
||||
}
|
||||
|
||||
impl EventConsumer for ChannelEventConsumer {
|
||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||
let receiver = Arc::clone(&self.receiver);
|
||||
Box::pin(stream::unfold(receiver, |rx| async move {
|
||||
let event = rx.lock().await.recv().await?;
|
||||
let envelope = EventEnvelope::new(event, Box::new(NoopAck));
|
||||
Some((Ok(envelope), rx))
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,14 +73,12 @@ impl EventPublisher for NoopEventPublisher {
|
||||
|
||||
pub fn create_event_channel(
|
||||
config: EventPublisherConfig,
|
||||
handlers: Vec<Box<dyn EventHandler>>,
|
||||
) -> (ChannelEventPublisher, EventWorker) {
|
||||
) -> (ChannelEventPublisher, ChannelEventConsumer) {
|
||||
let (tx, rx) = mpsc::channel(config.channel_buffer);
|
||||
(
|
||||
ChannelEventPublisher { sender: tx },
|
||||
EventWorker {
|
||||
receiver: rx,
|
||||
handlers,
|
||||
ChannelEventConsumer {
|
||||
receiver: Arc::new(Mutex::new(rx)),
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -121,107 +87,56 @@ pub fn create_event_channel(
|
||||
mod tests {
|
||||
use super::*;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
value_objects::{ExternalMetadataId, MovieId},
|
||||
};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use futures::StreamExt;
|
||||
|
||||
struct RecordingHandler {
|
||||
calls: Arc<Mutex<Vec<String>>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventHandler for RecordingHandler {
|
||||
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
let label = match event {
|
||||
DomainEvent::ReviewLogged { .. } => "review_logged",
|
||||
DomainEvent::ReviewUpdated { .. } => "review_updated",
|
||||
DomainEvent::MovieDiscovered { .. } => "movie_discovered",
|
||||
};
|
||||
self.calls.lock().unwrap().push(label.to_string());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn single_handler_receives_event() {
|
||||
let calls = Arc::new(Mutex::new(vec![]));
|
||||
let handler = RecordingHandler {
|
||||
calls: Arc::clone(&calls),
|
||||
};
|
||||
let config = EventPublisherConfig { channel_buffer: 8 };
|
||||
let (publisher, worker) = create_event_channel(config, vec![Box::new(handler)]);
|
||||
|
||||
let handle = tokio::spawn(worker.run());
|
||||
|
||||
let event = DomainEvent::MovieDiscovered {
|
||||
fn movie_discovered() -> DomainEvent {
|
||||
DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::generate(),
|
||||
external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(),
|
||||
};
|
||||
publisher.publish(&event).await.unwrap();
|
||||
drop(publisher);
|
||||
handle.await.unwrap();
|
||||
|
||||
assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multiple_handlers_all_receive_event() {
|
||||
let calls1 = Arc::new(Mutex::new(vec![]));
|
||||
let calls2 = Arc::new(Mutex::new(vec![]));
|
||||
let handler1 = RecordingHandler {
|
||||
calls: Arc::clone(&calls1),
|
||||
};
|
||||
let handler2 = RecordingHandler {
|
||||
calls: Arc::clone(&calls2),
|
||||
};
|
||||
let config = EventPublisherConfig { channel_buffer: 8 };
|
||||
let (publisher, worker) =
|
||||
create_event_channel(config, vec![Box::new(handler1), Box::new(handler2)]);
|
||||
|
||||
let handle = tokio::spawn(worker.run());
|
||||
|
||||
let event = DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::generate(),
|
||||
external_metadata_id: ExternalMetadataId::new("tt9999999".into()).unwrap(),
|
||||
};
|
||||
publisher.publish(&event).await.unwrap();
|
||||
drop(publisher);
|
||||
handle.await.unwrap();
|
||||
|
||||
assert_eq!(calls1.lock().unwrap().len(), 1);
|
||||
assert_eq!(calls2.lock().unwrap().len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn handler_error_does_not_stop_worker() {
|
||||
struct FailingHandler;
|
||||
#[async_trait]
|
||||
impl EventHandler for FailingHandler {
|
||||
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
|
||||
Err(DomainError::InfrastructureError("boom".into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let calls = Arc::new(Mutex::new(vec![]));
|
||||
let good = RecordingHandler {
|
||||
calls: Arc::clone(&calls),
|
||||
};
|
||||
#[tokio::test]
|
||||
async fn consumer_yields_published_events() {
|
||||
let config = EventPublisherConfig { channel_buffer: 8 };
|
||||
let (publisher, worker) =
|
||||
create_event_channel(config, vec![Box::new(FailingHandler), Box::new(good)]);
|
||||
let (publisher, consumer) = create_event_channel(config);
|
||||
|
||||
let handle = tokio::spawn(worker.run());
|
||||
|
||||
let event = DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::generate(),
|
||||
external_metadata_id: ExternalMetadataId::new("tt0000001".into()).unwrap(),
|
||||
};
|
||||
publisher.publish(&event).await.unwrap();
|
||||
publisher.publish(&movie_discovered()).await.unwrap();
|
||||
drop(publisher);
|
||||
handle.await.unwrap();
|
||||
|
||||
assert_eq!(calls.lock().unwrap().len(), 1);
|
||||
let mut stream = consumer.consume();
|
||||
let envelope = stream.next().await.unwrap().unwrap();
|
||||
assert!(matches!(envelope.event, DomainEvent::MovieDiscovered { .. }));
|
||||
assert!(stream.next().await.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn consumer_yields_multiple_events_in_order() {
|
||||
let config = EventPublisherConfig { channel_buffer: 8 };
|
||||
let (publisher, consumer) = create_event_channel(config);
|
||||
|
||||
publisher.publish(&movie_discovered()).await.unwrap();
|
||||
publisher.publish(&movie_discovered()).await.unwrap();
|
||||
drop(publisher);
|
||||
|
||||
let mut stream = consumer.consume();
|
||||
let first = stream.next().await.unwrap().unwrap();
|
||||
let second = stream.next().await.unwrap().unwrap();
|
||||
assert!(matches!(first.event, DomainEvent::MovieDiscovered { .. }));
|
||||
assert!(matches!(second.event, DomainEvent::MovieDiscovered { .. }));
|
||||
assert!(stream.next().await.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stream_ends_when_publisher_dropped() {
|
||||
let config = EventPublisherConfig { channel_buffer: 8 };
|
||||
let (publisher, consumer) = create_event_channel(config);
|
||||
drop(publisher);
|
||||
|
||||
let mut stream = consumer.consume();
|
||||
assert!(stream.next().await.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,8 @@ domain = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true }
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use application::{commands::SyncPosterCommand, context::AppContext, use_cases::sync_poster};
|
||||
use async_trait::async_trait;
|
||||
use domain::ports::EventHandler;
|
||||
use domain::{errors::DomainError, events::DomainEvent};
|
||||
|
||||
use crate::{commands::SyncPosterCommand, context::AppContext, use_cases::sync_poster};
|
||||
|
||||
pub struct PosterSyncHandler {
|
||||
ctx: AppContext,
|
||||
max_retries: u32,
|
||||
@@ -1,4 +1,6 @@
|
||||
pub mod commands;
|
||||
pub mod event_handlers;
|
||||
pub mod worker;
|
||||
pub mod config;
|
||||
pub mod context;
|
||||
pub mod movie_resolver;
|
||||
|
||||
215
crates/application/src/worker.rs
Normal file
215
crates/application/src/worker.rs
Normal file
@@ -0,0 +1,215 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use domain::{
|
||||
events::EventEnvelope,
|
||||
ports::{EventConsumer, EventHandler},
|
||||
};
|
||||
use futures::StreamExt;
|
||||
|
||||
pub struct WorkerService {
|
||||
consumer: Arc<dyn EventConsumer>,
|
||||
handlers: Vec<Arc<dyn EventHandler>>,
|
||||
}
|
||||
|
||||
impl WorkerService {
|
||||
pub fn new(consumer: Arc<dyn EventConsumer>, handlers: Vec<Arc<dyn EventHandler>>) -> Self {
|
||||
Self { consumer, handlers }
|
||||
}
|
||||
|
||||
pub async fn run(self) {
|
||||
let mut stream = self.consumer.consume();
|
||||
while let Some(result) = stream.next().await {
|
||||
match result {
|
||||
Ok(envelope) => self.dispatch(envelope).await,
|
||||
Err(e) => tracing::error!("event consumer error: {e}"),
|
||||
}
|
||||
}
|
||||
tracing::info!("event stream ended, worker shutting down");
|
||||
}
|
||||
|
||||
async fn dispatch(&self, envelope: EventEnvelope) {
|
||||
let mut all_ok = true;
|
||||
for handler in &self.handlers {
|
||||
if let Err(e) = handler.handle(&envelope.event).await {
|
||||
tracing::error!("event handler error: {e}");
|
||||
all_ok = false;
|
||||
}
|
||||
}
|
||||
let result = if all_ok {
|
||||
envelope.ack().await
|
||||
} else {
|
||||
envelope.nack().await
|
||||
};
|
||||
if let Err(e) = result {
|
||||
tracing::error!("ack/nack failed: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use async_trait::async_trait;
|
||||
use domain::{errors::DomainError, events::{AckHandle, DomainEvent}};
|
||||
use domain::value_objects::{ExternalMetadataId, MovieId};
|
||||
use futures::{stream, stream::BoxStream};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
struct NoopAck;
|
||||
|
||||
#[async_trait]
|
||||
impl AckHandle for NoopAck {
|
||||
async fn ack(&self) -> Result<(), DomainError> { Ok(()) }
|
||||
async fn nack(&self) -> Result<(), DomainError> { Ok(()) }
|
||||
}
|
||||
|
||||
struct VecConsumer {
|
||||
events: Vec<DomainEvent>,
|
||||
}
|
||||
|
||||
impl EventConsumer for VecConsumer {
|
||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||
let envelopes: Vec<Result<EventEnvelope, DomainError>> = self
|
||||
.events
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|e| Ok(EventEnvelope::new(e, Box::new(NoopAck))))
|
||||
.collect();
|
||||
Box::pin(stream::iter(envelopes))
|
||||
}
|
||||
}
|
||||
|
||||
struct RecordingHandler {
|
||||
calls: Arc<Mutex<Vec<&'static str>>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventHandler for RecordingHandler {
|
||||
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
let label = match event {
|
||||
DomainEvent::MovieDiscovered { .. } => "movie_discovered",
|
||||
DomainEvent::ReviewLogged { .. } => "review_logged",
|
||||
DomainEvent::ReviewUpdated { .. } => "review_updated",
|
||||
};
|
||||
self.calls.lock().unwrap().push(label);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn movie_discovered() -> DomainEvent {
|
||||
DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::generate(),
|
||||
external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dispatches_to_all_handlers() {
|
||||
let calls = Arc::new(Mutex::new(vec![]));
|
||||
let consumer = VecConsumer { events: vec![movie_discovered()] };
|
||||
let handler = RecordingHandler { calls: Arc::clone(&calls) };
|
||||
|
||||
WorkerService::new(Arc::new(consumer), vec![Arc::new(handler)])
|
||||
.run()
|
||||
.await;
|
||||
|
||||
assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn nacks_when_handler_fails() {
|
||||
let nack_called = Arc::new(Mutex::new(false));
|
||||
|
||||
struct TrackingAck {
|
||||
nack_called: Arc<Mutex<bool>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AckHandle for TrackingAck {
|
||||
async fn ack(&self) -> Result<(), DomainError> { Ok(()) }
|
||||
async fn nack(&self) -> Result<(), DomainError> {
|
||||
*self.nack_called.lock().unwrap() = true;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct TrackingConsumer {
|
||||
event: DomainEvent,
|
||||
nack_called: Arc<Mutex<bool>>,
|
||||
}
|
||||
|
||||
impl EventConsumer for TrackingConsumer {
|
||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||
let envelope = EventEnvelope::new(
|
||||
self.event.clone(),
|
||||
Box::new(TrackingAck { nack_called: Arc::clone(&self.nack_called) }),
|
||||
);
|
||||
Box::pin(stream::iter(vec![Ok(envelope)]))
|
||||
}
|
||||
}
|
||||
|
||||
struct FailingHandler;
|
||||
|
||||
#[async_trait]
|
||||
impl EventHandler for FailingHandler {
|
||||
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
|
||||
Err(DomainError::InfrastructureError("boom".into()))
|
||||
}
|
||||
}
|
||||
|
||||
let consumer = TrackingConsumer {
|
||||
event: movie_discovered(),
|
||||
nack_called: Arc::clone(&nack_called),
|
||||
};
|
||||
|
||||
WorkerService::new(Arc::new(consumer), vec![Arc::new(FailingHandler)])
|
||||
.run()
|
||||
.await;
|
||||
|
||||
assert!(*nack_called.lock().unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn acks_when_all_handlers_succeed() {
|
||||
let ack_called = Arc::new(Mutex::new(false));
|
||||
|
||||
struct TrackingAck {
|
||||
ack_called: Arc<Mutex<bool>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AckHandle for TrackingAck {
|
||||
async fn ack(&self) -> Result<(), DomainError> {
|
||||
*self.ack_called.lock().unwrap() = true;
|
||||
Ok(())
|
||||
}
|
||||
async fn nack(&self) -> Result<(), DomainError> { Ok(()) }
|
||||
}
|
||||
|
||||
struct TrackingConsumer {
|
||||
event: DomainEvent,
|
||||
ack_called: Arc<Mutex<bool>>,
|
||||
}
|
||||
|
||||
impl EventConsumer for TrackingConsumer {
|
||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||
let envelope = EventEnvelope::new(
|
||||
self.event.clone(),
|
||||
Box::new(TrackingAck { ack_called: Arc::clone(&self.ack_called) }),
|
||||
);
|
||||
Box::pin(stream::iter(vec![Ok(envelope)]))
|
||||
}
|
||||
}
|
||||
|
||||
let consumer = TrackingConsumer {
|
||||
event: movie_discovered(),
|
||||
ack_called: Arc::clone(&ack_called),
|
||||
};
|
||||
|
||||
WorkerService::new(Arc::new(consumer), vec![])
|
||||
.run()
|
||||
.await;
|
||||
|
||||
assert!(*ack_called.lock().unwrap());
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,10 @@
|
||||
use async_trait::async_trait;
|
||||
use chrono::NaiveDateTime;
|
||||
|
||||
use crate::value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId};
|
||||
use crate::{
|
||||
errors::DomainError,
|
||||
value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId},
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum DomainEvent {
|
||||
@@ -23,3 +27,28 @@ pub enum DomainEvent {
|
||||
external_metadata_id: ExternalMetadataId,
|
||||
},
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait AckHandle: Send + Sync {
|
||||
async fn ack(&self) -> Result<(), DomainError>;
|
||||
async fn nack(&self) -> Result<(), DomainError>;
|
||||
}
|
||||
|
||||
pub struct EventEnvelope {
|
||||
pub event: DomainEvent,
|
||||
ack: Box<dyn AckHandle>,
|
||||
}
|
||||
|
||||
impl EventEnvelope {
|
||||
pub fn new(event: DomainEvent, ack: Box<dyn AckHandle>) -> Self {
|
||||
Self { event, ack }
|
||||
}
|
||||
|
||||
pub async fn ack(self) -> Result<(), DomainError> {
|
||||
self.ack.ack().await
|
||||
}
|
||||
|
||||
pub async fn nack(self) -> Result<(), DomainError> {
|
||||
self.ack.nack().await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use chrono::{DateTime, Utc};
|
||||
|
||||
use crate::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
events::{DomainEvent, EventEnvelope},
|
||||
models::{
|
||||
DiaryEntry, DiaryFilter, ExportFormat, FeedEntry, Movie, Review, ReviewHistory, User,
|
||||
UserStats, UserSummary, UserTrends,
|
||||
@@ -174,9 +174,10 @@ pub trait EventPublisher: Send + Sync {
|
||||
}
|
||||
|
||||
pub trait EventConsumer: Send + Sync {
|
||||
/// Returns a stream of domain events. Implementations decide whether this
|
||||
/// is push-based (NATS) or poll-based (DB queue) — callers don't care.
|
||||
fn consume(&self) -> futures::stream::BoxStream<'_, Result<DomainEvent, DomainError>>;
|
||||
/// Returns a stream of event envelopes. Each envelope carries a domain event
|
||||
/// and an ack handle — callers ack after successful dispatch, nack on failure.
|
||||
/// Implementations decide transport (NATS, DB queue, in-memory channel).
|
||||
fn consume(&self) -> futures::stream::BoxStream<'_, Result<EventEnvelope, DomainError>>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
pub mod csrf;
|
||||
pub mod dtos;
|
||||
pub mod errors;
|
||||
pub mod event_handlers;
|
||||
pub mod extractors;
|
||||
pub mod handlers;
|
||||
pub mod openapi;
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use event_publisher::{EventPublisherConfig, NoopEventPublisher, create_event_channel};
|
||||
use presentation::event_handlers::PosterSyncHandler;
|
||||
use application::event_handlers::PosterSyncHandler;
|
||||
use std::str::FromStr;
|
||||
|
||||
use tokio::net::TcpListener;
|
||||
@@ -24,7 +24,7 @@ use activitypub::{
|
||||
ReviewObjectHandler,
|
||||
};
|
||||
|
||||
use application::{config::AppConfig, context::AppContext};
|
||||
use application::{config::AppConfig, context::AppContext, worker::WorkerService};
|
||||
use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService};
|
||||
use export::ExportAdapter;
|
||||
use metadata::MetadataClientImpl;
|
||||
@@ -184,12 +184,13 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
||||
);
|
||||
let ap_service_arc: Arc<dyn ActivityPubPort> = concrete_ap_service;
|
||||
|
||||
let poster_handler = PosterSyncHandler::new(handler_ctx, 3);
|
||||
let (event_publisher, event_worker) = create_event_channel(
|
||||
EventPublisherConfig::from_env(),
|
||||
vec![Box::new(poster_handler), Box::new(ap_event_handler)],
|
||||
let poster_handler = Arc::new(PosterSyncHandler::new(handler_ctx, 3));
|
||||
let (event_publisher, consumer) = create_event_channel(EventPublisherConfig::from_env());
|
||||
let worker = WorkerService::new(
|
||||
Arc::new(consumer),
|
||||
vec![poster_handler, Arc::new(ap_event_handler)],
|
||||
);
|
||||
tokio::spawn(event_worker.run());
|
||||
tokio::spawn(worker.run());
|
||||
|
||||
let ep: Arc<dyn domain::ports::EventPublisher> = Arc::new(event_publisher);
|
||||
(ep, ap_router, ap_service_arc, social_query_arc)
|
||||
@@ -197,12 +198,10 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
||||
|
||||
#[cfg(not(feature = "federation"))]
|
||||
let (event_publisher_arc, ap_router): (Arc<dyn domain::ports::EventPublisher>, axum::Router) = {
|
||||
let poster_handler = PosterSyncHandler::new(handler_ctx, 3);
|
||||
let (event_publisher, event_worker) = create_event_channel(
|
||||
EventPublisherConfig::from_env(),
|
||||
vec![Box::new(poster_handler)],
|
||||
);
|
||||
tokio::spawn(event_worker.run());
|
||||
let poster_handler = Arc::new(PosterSyncHandler::new(handler_ctx, 3));
|
||||
let (event_publisher, consumer) = create_event_channel(EventPublisherConfig::from_env());
|
||||
let worker = WorkerService::new(Arc::new(consumer), vec![poster_handler]);
|
||||
tokio::spawn(worker.run());
|
||||
(Arc::new(event_publisher), axum::Router::new())
|
||||
};
|
||||
|
||||
|
||||
36
crates/worker/Cargo.toml
Normal file
36
crates/worker/Cargo.toml
Normal file
@@ -0,0 +1,36 @@
|
||||
[package]
|
||||
name = "worker"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[features]
|
||||
default = ["sqlite"]
|
||||
sqlite = ["dep:sqlite"]
|
||||
postgres = ["dep:postgres"]
|
||||
|
||||
[dependencies]
|
||||
domain = { workspace = true }
|
||||
application = { workspace = true }
|
||||
event-publisher = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
dotenvy = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
auth = { workspace = true }
|
||||
metadata = { workspace = true }
|
||||
poster-fetcher = { workspace = true }
|
||||
poster-storage = { workspace = true }
|
||||
export = { workspace = true }
|
||||
sqlx = { workspace = true }
|
||||
|
||||
# Optional — database backends
|
||||
sqlite = { workspace = true, optional = true }
|
||||
postgres = { workspace = true, optional = true }
|
||||
177
crates/worker/src/main.rs
Normal file
177
crates/worker/src/main.rs
Normal file
@@ -0,0 +1,177 @@
|
||||
use std::sync::Arc;
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use application::{config::AppConfig, context::AppContext, event_handlers::PosterSyncHandler, worker::WorkerService};
|
||||
use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService};
|
||||
use event_publisher::{EventPublisherConfig, create_event_channel};
|
||||
use export::ExportAdapter;
|
||||
use metadata::MetadataClientImpl;
|
||||
use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher};
|
||||
use poster_storage::{PosterStorageAdapter, StorageConfig};
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
use sqlite::{SqliteMovieRepository, SqliteUserRepository};
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
use postgres::{PostgresRepository, PostgresUserRepository};
|
||||
|
||||
use domain::ports::{
|
||||
AuthService, DiaryExporter, DiaryRepository, MetadataClient, MovieRepository,
|
||||
PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository,
|
||||
UserRepository,
|
||||
};
|
||||
|
||||
#[cfg(not(any(feature = "sqlite", feature = "postgres")))]
|
||||
compile_error!("At least one database backend must be enabled. Use --features sqlite or --features postgres");
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
dotenvy::dotenv().ok();
|
||||
init_tracing();
|
||||
|
||||
let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?;
|
||||
let backend = std::env::var("DATABASE_BACKEND").unwrap_or_else(|_| "sqlite".to_string());
|
||||
let auth_config = AuthConfig::from_env()?;
|
||||
let storage_config = StorageConfig::from_env()?;
|
||||
let app_config = AppConfig::from_env();
|
||||
|
||||
let metadata_client: Arc<dyn MetadataClient> =
|
||||
if let Ok(tmdb_key) = std::env::var("TMDB_API_KEY") {
|
||||
Arc::new(MetadataClientImpl::new_tmdb(tmdb_key))
|
||||
} else {
|
||||
let omdb_key = std::env::var("OMDB_API_KEY")
|
||||
.context("Either TMDB_API_KEY or OMDB_API_KEY must be set")?;
|
||||
Arc::new(MetadataClientImpl::new_omdb(omdb_key))
|
||||
};
|
||||
let poster_fetcher: Arc<dyn PosterFetcherClient> =
|
||||
Arc::new(ReqwestPosterFetcher::new(PosterFetcherConfig::from_env())?);
|
||||
let poster_storage: Arc<dyn PosterStorage> =
|
||||
Arc::new(PosterStorageAdapter::from_config(storage_config));
|
||||
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
||||
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
||||
|
||||
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository):
|
||||
(Arc<dyn MovieRepository>, Arc<dyn ReviewRepository>, Arc<dyn DiaryRepository>,
|
||||
Arc<dyn StatsRepository>, Arc<dyn UserRepository>) =
|
||||
match backend.as_str() {
|
||||
#[cfg(feature = "postgres")]
|
||||
"postgres" => {
|
||||
let (_, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
||||
(m, r, d, s, u)
|
||||
}
|
||||
#[cfg(feature = "sqlite")]
|
||||
_ => {
|
||||
let (_, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
||||
(m, r, d, s, u)
|
||||
}
|
||||
#[cfg(not(feature = "sqlite"))]
|
||||
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"),
|
||||
};
|
||||
|
||||
let (event_publisher_arc, consumer) = {
|
||||
let (publisher, consumer) = create_event_channel(EventPublisherConfig::from_env());
|
||||
(Arc::new(publisher) as Arc<dyn domain::ports::EventPublisher>, consumer)
|
||||
};
|
||||
|
||||
let ctx = AppContext {
|
||||
movie_repository,
|
||||
review_repository,
|
||||
diary_repository,
|
||||
diary_exporter: Arc::new(ExportAdapter) as Arc<dyn DiaryExporter>,
|
||||
stats_repository,
|
||||
metadata_client,
|
||||
poster_fetcher,
|
||||
poster_storage,
|
||||
event_publisher: event_publisher_arc,
|
||||
auth_service,
|
||||
password_hasher,
|
||||
user_repository,
|
||||
config: app_config,
|
||||
};
|
||||
|
||||
let poster_handler = Arc::new(PosterSyncHandler::new(ctx, 3));
|
||||
let worker = WorkerService::new(Arc::new(consumer), vec![poster_handler]);
|
||||
|
||||
tracing::info!("worker started");
|
||||
worker.run().await;
|
||||
tracing::info!("worker stopped");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn init_tracing() {
|
||||
tracing_subscriber::registry()
|
||||
.with(tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| "info".into()))
|
||||
.with(tracing_subscriber::fmt::layer())
|
||||
.init();
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
async fn wire_sqlite(database_url: &str) -> anyhow::Result<(
|
||||
sqlx::SqlitePool,
|
||||
Arc<dyn MovieRepository>,
|
||||
Arc<dyn ReviewRepository>,
|
||||
Arc<dyn DiaryRepository>,
|
||||
Arc<dyn StatsRepository>,
|
||||
Arc<dyn UserRepository>,
|
||||
)> {
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
|
||||
let opts = SqliteConnectOptions::from_str(database_url)
|
||||
.context("Invalid DATABASE_URL")?
|
||||
.create_if_missing(true)
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.busy_timeout(std::time::Duration::from_secs(5));
|
||||
let pool = sqlx::SqlitePool::connect_with(opts)
|
||||
.await
|
||||
.context("Failed to connect to SQLite database")?;
|
||||
|
||||
let sqlite_repo = Arc::new(SqliteMovieRepository::new(pool.clone()));
|
||||
sqlite_repo
|
||||
.migrate()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{}", e))
|
||||
.context("Database migration failed")?;
|
||||
|
||||
let movie_repository: Arc<dyn MovieRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let review_repository: Arc<dyn ReviewRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let diary_repository: Arc<dyn DiaryRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let stats_repository: Arc<dyn StatsRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let user_repository: Arc<dyn UserRepository> =
|
||||
Arc::new(SqliteUserRepository::new(pool.clone()));
|
||||
|
||||
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
async fn wire_postgres(database_url: &str) -> anyhow::Result<(
|
||||
sqlx::PgPool,
|
||||
Arc<dyn MovieRepository>,
|
||||
Arc<dyn ReviewRepository>,
|
||||
Arc<dyn DiaryRepository>,
|
||||
Arc<dyn StatsRepository>,
|
||||
Arc<dyn UserRepository>,
|
||||
)> {
|
||||
let pool = sqlx::PgPool::connect(database_url)
|
||||
.await
|
||||
.context("Failed to connect to PostgreSQL database")?;
|
||||
|
||||
let pg_repo = Arc::new(PostgresRepository::new(pool.clone()));
|
||||
pg_repo
|
||||
.migrate()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{}", e))
|
||||
.context("Database migration failed")?;
|
||||
|
||||
let movie_repository: Arc<dyn MovieRepository> = Arc::clone(&pg_repo) as _;
|
||||
let review_repository: Arc<dyn ReviewRepository> = Arc::clone(&pg_repo) as _;
|
||||
let diary_repository: Arc<dyn DiaryRepository> = Arc::clone(&pg_repo) as _;
|
||||
let stats_repository: Arc<dyn StatsRepository> = Arc::clone(&pg_repo) as _;
|
||||
let user_repository: Arc<dyn UserRepository> =
|
||||
Arc::new(PostgresUserRepository::new(pool.clone()));
|
||||
|
||||
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
||||
}
|
||||
Reference in New Issue
Block a user