diff --git a/.env.example b/.env.example index 8efd8c8..2ebd92d 100644 --- a/.env.example +++ b/.env.example @@ -39,4 +39,11 @@ ALLOW_REGISTRATION=false RATE_LIMIT=20 POSTER_FETCH_TIMEOUT_SECONDS=30 EVENT_CHANNEL_BUFFER=128 + +# NATS event bus (optional — falls back to in-memory channel when unset) +# NATS_URL=nats://localhost:4222 +# NATS_MODE=jetstream # "jetstream" (default, at-least-once) or "core" (fire-and-forget) +# NATS_SUBJECT_PREFIX=movies-diary.events +# NATS_STREAM_NAME=MOVIES_DIARY_EVENTS +# NATS_CONSUMER_NAME=worker RUST_LOG=presentation=debug,tower_http=debug diff --git a/Cargo.lock b/Cargo.lock index cbfe683..113eb85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -254,7 +254,7 @@ checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" dependencies = [ "cfg-if", "cipher", - "cpufeatures", + "cpufeatures 0.2.17", ] [[package]] @@ -328,7 +328,7 @@ checksum = "3c3610892ee6e0cbce8ae2700349fcf8f98adb0dbfbee85aec3c9179d29cc072" dependencies = [ "base64ct", "blake2", - "cpufeatures", + "cpufeatures 0.2.17", "password-hash", ] @@ -452,6 +452,42 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-nats" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31811585c7c5bc2f60f8b80d5a6b0f737115611dac47567d7f7d94562ebb180b" +dependencies = [ + "base64", + "bytes", + "futures-util", + "memchr", + "nkeys", + "nuid", + "pin-project", + "portable-atomic", + "rand 0.10.1", + "regex", + "ring", + "rustls-native-certs", + "rustls-pki-types", + "rustls-webpki", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror 2.0.18", + "time", + "tokio", + "tokio-rustls", + "tokio-stream", + "tokio-util", + "tokio-websockets", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-process" version = "2.5.0" @@ -794,6 +830,9 @@ name = "bytes" version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +dependencies = [ + "serde", +] [[package]] name = "bytestring" @@ -857,6 +896,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.1", +] + [[package]] name = "chrono" version = "0.4.44" @@ -973,6 +1023,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc" version = "3.4.0" @@ -1098,6 +1157,32 @@ dependencies = [ "memchr", ] +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "curve25519-dalek-derive", + "digest", + "fiat-crypto", + "rustc_version", + "subtle", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "darling" version = "0.20.11" @@ -1181,6 +1266,12 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "data-encoding" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4ae5f15dda3c708c0ade84bfee31ccab44a3da4f88015ed22f63732abe300c8" + [[package]] name = "deltae" version = "0.3.2" @@ -1205,6 +1296,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" dependencies = [ "powerfmt", + "serde_core", ] [[package]] @@ -1376,6 +1468,28 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9" +dependencies = [ + "curve25519-dalek", + "ed25519", + "sha2", + "signature", + "subtle", +] + [[package]] name = "either" version = "1.15.0" @@ -1550,6 +1664,12 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "filedescriptor" version = "0.8.3" @@ -1807,6 +1927,7 @@ dependencies = [ "cfg-if", "libc", "r-efi 6.0.0", + "rand_core 0.10.1", "wasip2", "wasip3", ] @@ -2699,6 +2820,24 @@ dependencies = [ "uuid", ] +[[package]] +name = "nats" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-nats", + "async-trait", + "chrono", + "domain", + "futures", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "never" version = "0.1.0" @@ -2718,6 +2857,21 @@ dependencies = [ "memoffset", ] +[[package]] +name = "nkeys" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879011babc47a1c7fdf5a935ae3cfe94f34645ca0cac1c7f6424b36fc743d1bf" +dependencies = [ + "data-encoding", + "ed25519", + "ed25519-dalek", + "getrandom 0.2.17", + "log", + "rand 0.8.6", + "signatory", +] + [[package]] name = "nom" version = "7.1.3" @@ -2749,6 +2903,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nuid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" +dependencies = [ + "rand 0.8.6", +] + [[package]] name = "num" version = "0.4.3" @@ -3092,6 +3255,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbf0d9e68100b3a7989b4901972f265cd542e560a3a8a724e1e20322f4d06ce9" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a990e22f43e84855daf260dded30524ef4a9021cc7541c26540500a50b624389" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "pin-project-lite" version = "0.2.17" @@ -3199,6 +3382,10 @@ dependencies = [ "uuid", ] +[[package]] +name = "postgres-event-queue" +version = "0.1.0" + [[package]] name = "postgres-federation" version = "0.1.0" @@ -3259,6 +3446,7 @@ dependencies = [ "http-body-util", "infer", "metadata", + "nats", "percent-encoding", "poster-fetcher", "poster-storage", @@ -3418,6 +3606,17 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "rand" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.1", +] + [[package]] name = "rand_chacha" version = "0.3.1" @@ -3456,6 +3655,12 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rand_core" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" + [[package]] name = "ratatui" version = "0.30.0" @@ -4027,6 +4232,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + [[package]] name = "serde_path_to_error" version = "0.1.20" @@ -4068,7 +4282,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", ] @@ -4085,7 +4299,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", ] @@ -4135,6 +4349,18 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8", + "rand_core 0.6.4", + "signature", + "zeroize", +] + [[package]] name = "signature" version = "2.2.0" @@ -4283,6 +4509,10 @@ dependencies = [ "uuid", ] +[[package]] +name = "sqlite-event-queue" +version = "0.1.0" + [[package]] name = "sqlite-federation" version = "0.1.0" @@ -4880,6 +5110,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-websockets" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f591660438b3038dd04d16c938271c79e7e06260ad2ea2885a4861bfb238605d" +dependencies = [ + "base64", + "bytes", + "futures-core", + "futures-sink", + "http 1.4.0", + "httparse", + "rand 0.8.6", + "ring", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tokio-util", + "webpki-roots 0.26.11", +] + [[package]] name = "toml_datetime" version = "1.1.1+spec-1.1.0" @@ -5034,6 +5285,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tryhard" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fe58ebd5edd976e0fe0f8a14d2a04b7c81ef153ea9a54eebc42e67c2c23b4e5" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tui" version = "0.1.0" @@ -5956,6 +6217,7 @@ dependencies = [ "export", "futures", "metadata", + "nats", "poster-fetcher", "poster-storage", "postgres", diff --git a/Cargo.toml b/Cargo.toml index 90a90db..6f982a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,10 +10,13 @@ members = [ "crates/adapters/postgres", "crates/adapters/sqlite-federation", "crates/adapters/postgres-federation", + "crates/adapters/sqlite-event-queue", + "crates/adapters/postgres-event-queue", "crates/adapters/template-askama", "crates/adapters/activitypub", "crates/adapters/activitypub-base", "crates/adapters/export", + "crates/adapters/nats", "crates/application", "crates/domain", "crates/presentation", @@ -64,3 +67,6 @@ template-askama = { path = "crates/adapters/template-askama" } activitypub = { path = "crates/adapters/activitypub" } activitypub-base = { path = "crates/adapters/activitypub-base" } doc = { path = "crates/doc" } +nats = { path = "crates/adapters/nats" } +sqlite-event-queue = { path = "crates/adapters/sqlite-event-queue" } +postgres-event-queue = { path = "crates/adapters/postgres-event-queue" } diff --git a/Dockerfile b/Dockerfile index ae6bd78..4519fca 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,6 +11,7 @@ COPY crates/adapters/activitypub/Cargo.toml crates/adapters/activitypub/Ca COPY crates/adapters/activitypub-base/Cargo.toml crates/adapters/activitypub-base/Cargo.toml COPY crates/adapters/auth/Cargo.toml crates/adapters/auth/Cargo.toml COPY crates/adapters/event-publisher/Cargo.toml crates/adapters/event-publisher/Cargo.toml +COPY crates/adapters/nats/Cargo.toml crates/adapters/nats/Cargo.toml COPY crates/adapters/metadata/Cargo.toml crates/adapters/metadata/Cargo.toml COPY crates/adapters/poster-fetcher/Cargo.toml crates/adapters/poster-fetcher/Cargo.toml COPY crates/adapters/poster-storage/Cargo.toml crates/adapters/poster-storage/Cargo.toml diff --git a/crates/adapters/nats/Cargo.toml b/crates/adapters/nats/Cargo.toml new file mode 100644 index 0000000..c9a5fec --- /dev/null +++ b/crates/adapters/nats/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "nats" +version = "0.1.0" +edition = "2024" + +[dependencies] +async-nats = "0.48.0" + +domain = { workspace = true } +async-trait = { workspace = true } +anyhow = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +tokio = { workspace = true } +futures = { workspace = true } diff --git a/crates/adapters/nats/src/config.rs b/crates/adapters/nats/src/config.rs new file mode 100644 index 0000000..402e63d --- /dev/null +++ b/crates/adapters/nats/src/config.rs @@ -0,0 +1,101 @@ +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum NatsMode { + Core, + JetStream, +} + +#[derive(Debug, Clone)] +pub struct NatsConfig { + pub url: String, + pub mode: NatsMode, + pub subject_prefix: String, + pub stream_name: String, + pub consumer_name: String, +} + +impl NatsConfig { + pub fn from_env() -> anyhow::Result { + let url = std::env::var("NATS_URL") + .map_err(|_| anyhow::anyhow!("NATS_URL is not set"))?; + + let mode = match std::env::var("NATS_MODE") + .unwrap_or_else(|_| "jetstream".to_string()) + .as_str() + { + "core" => NatsMode::Core, + "jetstream" => NatsMode::JetStream, + other => anyhow::bail!("unknown NATS_MODE: {other}"), + }; + + let subject_prefix = std::env::var("NATS_SUBJECT_PREFIX") + .unwrap_or_else(|_| "movies-diary.events".to_string()); + let stream_name = std::env::var("NATS_STREAM_NAME") + .unwrap_or_else(|_| "MOVIES_DIARY_EVENTS".to_string()); + let consumer_name = std::env::var("NATS_CONSUMER_NAME") + .unwrap_or_else(|_| "worker".to_string()); + + Ok(Self { url, mode, subject_prefix, stream_name, consumer_name }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn errors_without_nats_url() { + unsafe { std::env::remove_var("NATS_URL"); } + assert!(NatsConfig::from_env().is_err()); + } + + #[test] + fn defaults_with_only_url() { + unsafe { + std::env::set_var("NATS_URL", "nats://localhost:4222"); + std::env::remove_var("NATS_MODE"); + std::env::remove_var("NATS_SUBJECT_PREFIX"); + std::env::remove_var("NATS_STREAM_NAME"); + std::env::remove_var("NATS_CONSUMER_NAME"); + } + + let cfg = NatsConfig::from_env().unwrap(); + assert_eq!(cfg.url, "nats://localhost:4222"); + assert_eq!(cfg.mode, NatsMode::JetStream); + assert_eq!(cfg.subject_prefix, "movies-diary.events"); + assert_eq!(cfg.stream_name, "MOVIES_DIARY_EVENTS"); + assert_eq!(cfg.consumer_name, "worker"); + + unsafe { std::env::remove_var("NATS_URL"); } + } + + #[test] + fn core_mode_parsed() { + unsafe { + std::env::set_var("NATS_URL", "nats://test:4222"); + std::env::set_var("NATS_MODE", "core"); + } + + let cfg = NatsConfig::from_env().unwrap(); + assert_eq!(cfg.mode, NatsMode::Core); + + unsafe { + std::env::remove_var("NATS_URL"); + std::env::remove_var("NATS_MODE"); + } + } + + #[test] + fn invalid_mode_errors() { + unsafe { + std::env::set_var("NATS_URL", "nats://test:4222"); + std::env::set_var("NATS_MODE", "kafka"); + } + + assert!(NatsConfig::from_env().is_err()); + + unsafe { + std::env::remove_var("NATS_URL"); + std::env::remove_var("NATS_MODE"); + } + } +} diff --git a/crates/adapters/nats/src/consumer.rs b/crates/adapters/nats/src/consumer.rs new file mode 100644 index 0000000..caca808 --- /dev/null +++ b/crates/adapters/nats/src/consumer.rs @@ -0,0 +1,213 @@ +use async_nats::{ + Client, + jetstream::{self, consumer::pull, message::AckKind, stream::Config as StreamConfig}, +}; +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::{AckHandle, DomainEvent, EventEnvelope}, + ports::EventConsumer, +}; +use futures::{ + StreamExt, + stream::{self, BoxStream}, +}; +use std::sync::Arc; +use tokio::sync::{Mutex, mpsc}; + +use crate::{config::NatsConfig, payload::NatsEventPayload, subject::consumer_subject_filter}; + +// ── JetStream ack handle ───────────────────────────────────────────────────── + +struct NatsJetStreamAckHandle { + message: async_nats::jetstream::Message, +} + +#[async_trait] +impl AckHandle for NatsJetStreamAckHandle { + async fn ack(&self) -> Result<(), DomainError> { + tracing::debug!( + "acknowledging message with sequence {}", + self.message.info().unwrap().stream_sequence + ); + + self.message + .ack() + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } + + async fn nack(&self) -> Result<(), DomainError> { + tracing::debug!( + "negatively acknowledging message with sequence {}", + self.message.info().unwrap().stream_sequence + ); + + self.message + .ack_with(AckKind::Nak(None)) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } +} + +// ── Core NATS ack handle (no-op) ───────────────────────────────────────────── + +struct NoopAck; + +#[async_trait] +impl AckHandle for NoopAck { + async fn ack(&self) -> Result<(), DomainError> { + Ok(()) + } + async fn nack(&self) -> Result<(), DomainError> { + Ok(()) + } +} + +// ── Envelope construction helpers ──────────────────────────────────────────── + +fn decode_js(msg: async_nats::jetstream::Message) -> Result { + let payload: NatsEventPayload = serde_json::from_slice(&msg.payload) + .map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?; + let event = DomainEvent::try_from(payload)?; + Ok(EventEnvelope::new( + event, + Box::new(NatsJetStreamAckHandle { message: msg }), + )) +} + +fn decode_core(msg: async_nats::Message) -> Result { + let payload: NatsEventPayload = serde_json::from_slice(&msg.payload) + .map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?; + let event = DomainEvent::try_from(payload)?; + Ok(EventEnvelope::new(event, Box::new(NoopAck))) +} + +// ── Channel-bridge shared by both consumers ────────────────────────────────── + +type EnvelopeRx = Arc>>>; + +fn consume_from_rx(rx: EnvelopeRx) -> BoxStream<'static, Result> { + Box::pin(stream::unfold(rx, |rx| async move { + let item = rx.lock().await.recv().await?; + Some((item, rx)) + })) +} + +// ── JetStream consumer ──────────────────────────────────────────────────────── + +pub struct NatsJetStreamConsumer { + rx: EnvelopeRx, +} + +impl NatsJetStreamConsumer { + pub async fn create(cfg: &NatsConfig, client: Client) -> anyhow::Result { + let js = jetstream::new(client); + + let stream = js + .get_or_create_stream(StreamConfig { + name: cfg.stream_name.clone(), + subjects: vec![consumer_subject_filter(&cfg.subject_prefix)], + max_messages: 100_000, + ..Default::default() + }) + .await?; + + let subject_filter = consumer_subject_filter(&cfg.subject_prefix); + let consumer = stream + .get_or_create_consumer( + cfg.consumer_name.as_str(), + pull::Config { + durable_name: Some(cfg.consumer_name.clone()), + filter_subject: subject_filter, + ..Default::default() + }, + ) + .await?; + + let (tx, rx) = mpsc::channel(128); + + tokio::spawn(async move { + loop { + let mut messages = match consumer.messages().await { + Err(e) => { + tracing::error!("failed to fetch messages: {}", e); + + let _ = tx + .send(Err(DomainError::InfrastructureError(e.to_string()))) + .await; + return; + } + Ok(m) => m, + }; + while let Some(result) = messages.next().await { + let envelope = result + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + .and_then(decode_js); + + if tx.send(envelope).await.is_err() { + tracing::info!("consumer channel closed, stopping message processing"); + return; + } + + tracing::debug!("message sent to consumer channel"); + } + // messages() stream ended (fetch expired in strict mode) — restart + } + }); + + Ok(Self { + rx: Arc::new(Mutex::new(rx)), + }) + } +} + +impl EventConsumer for NatsJetStreamConsumer { + fn consume(&self) -> BoxStream<'_, Result> { + consume_from_rx(Arc::clone(&self.rx)) + } +} + +// ── Core NATS consumer ──────────────────────────────────────────────────────── + +pub struct NatsCoreConsumer { + rx: EnvelopeRx, +} + +impl NatsCoreConsumer { + pub async fn create(cfg: &NatsConfig, client: Client) -> anyhow::Result { + let subject = consumer_subject_filter(&cfg.subject_prefix); + let mut subscriber = client.subscribe(subject).await?; + + let (tx, rx) = mpsc::channel(128); + + tokio::spawn(async move { + while let Some(msg) = subscriber.next().await { + let envelope = decode_core(msg); + + tracing::debug!("message received and decoded, sending to consumer channel"); + + if tx.send(envelope).await.is_err() { + tracing::info!("consumer channel closed, stopping message processing"); + break; + } + } + }); + + Ok(Self { + rx: Arc::new(Mutex::new(rx)), + }) + } +} + +impl EventConsumer for NatsCoreConsumer { + fn consume(&self) -> BoxStream<'_, Result> { + consume_from_rx(Arc::clone(&self.rx)) + } +} + +fn _assert_send_sync() { + fn check() {} + check::(); + check::(); +} diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs new file mode 100644 index 0000000..a762b36 --- /dev/null +++ b/crates/adapters/nats/src/lib.rs @@ -0,0 +1,52 @@ +mod config; +mod consumer; +mod payload; +mod publisher; +mod subject; + +pub use config::{NatsConfig, NatsMode}; +pub use consumer::{NatsCoreConsumer, NatsJetStreamConsumer}; +pub use publisher::NatsEventPublisher; + +use std::sync::Arc; + +use domain::ports::{EventConsumer, EventPublisher}; + +pub async fn create_publisher(cfg: NatsConfig) -> anyhow::Result> { + let client = async_nats::connect(&cfg.url).await?; + let publisher: Arc = match cfg.mode { + NatsMode::Core => Arc::new(NatsEventPublisher::new_core(client, cfg.subject_prefix)), + NatsMode::JetStream => Arc::new(NatsEventPublisher::new_jetstream( + client, + cfg.subject_prefix, + )), + }; + + tracing::info!("NATS publisher created (mode: {:?})", cfg.mode); + Ok(publisher) +} + +pub async fn create_channel( + cfg: NatsConfig, +) -> anyhow::Result<(Arc, Arc)> { + let client = async_nats::connect(&cfg.url).await?; + + let publisher: Arc = match cfg.mode { + NatsMode::Core => Arc::new(NatsEventPublisher::new_core( + client.clone(), + cfg.subject_prefix.clone(), + )), + NatsMode::JetStream => Arc::new(NatsEventPublisher::new_jetstream( + client.clone(), + cfg.subject_prefix.clone(), + )), + }; + + let consumer: Arc = match cfg.mode { + NatsMode::Core => Arc::new(NatsCoreConsumer::create(&cfg, client).await?), + NatsMode::JetStream => Arc::new(NatsJetStreamConsumer::create(&cfg, client).await?), + }; + + tracing::info!("NATS channel created (mode: {:?})", cfg.mode); + Ok((publisher, consumer)) +} diff --git a/crates/adapters/nats/src/payload.rs b/crates/adapters/nats/src/payload.rs new file mode 100644 index 0000000..dab12bf --- /dev/null +++ b/crates/adapters/nats/src/payload.rs @@ -0,0 +1,172 @@ +use chrono::NaiveDateTime; +use domain::{ + errors::DomainError, + events::DomainEvent, + value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId}, +}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(tag = "type", content = "data")] +pub enum NatsEventPayload { + ReviewLogged { + review_id: String, + movie_id: String, + user_id: String, + rating: u8, + watched_at: i64, + }, + ReviewUpdated { + review_id: String, + movie_id: String, + user_id: String, + rating: u8, + watched_at: i64, + }, + MovieDiscovered { + movie_id: String, + external_metadata_id: String, + }, +} + +fn parse_uuid(s: &str, field: &str) -> Result { + Uuid::parse_str(s) + .map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}"))) +} + +fn parse_ts(ts: i64) -> Result { + chrono::DateTime::from_timestamp(ts, 0) + .map(|dt| dt.naive_utc()) + .ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}"))) +} + +impl From<&DomainEvent> for NatsEventPayload { + fn from(event: &DomainEvent) -> Self { + match event { + DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => { + NatsEventPayload::ReviewLogged { + review_id: review_id.value().to_string(), + movie_id: movie_id.value().to_string(), + user_id: user_id.value().to_string(), + rating: rating.value(), + watched_at: watched_at.and_utc().timestamp(), + } + } + DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => { + NatsEventPayload::ReviewUpdated { + review_id: review_id.value().to_string(), + movie_id: movie_id.value().to_string(), + user_id: user_id.value().to_string(), + rating: rating.value(), + watched_at: watched_at.and_utc().timestamp(), + } + } + DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => { + NatsEventPayload::MovieDiscovered { + movie_id: movie_id.value().to_string(), + external_metadata_id: external_metadata_id.value().to_owned(), + } + } + } + } +} + +impl TryFrom for DomainEvent { + type Error = DomainError; + fn try_from(payload: NatsEventPayload) -> Result { + match payload { + NatsEventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => { + Ok(DomainEvent::ReviewLogged { + review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?), + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + rating: Rating::new(rating)?, + watched_at: parse_ts(watched_at)?, + }) + } + NatsEventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => { + Ok(DomainEvent::ReviewUpdated { + review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?), + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + rating: Rating::new(rating)?, + watched_at: parse_ts(watched_at)?, + }) + } + NatsEventPayload::MovieDiscovered { movie_id, external_metadata_id } => { + Ok(DomainEvent::MovieDiscovered { + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + external_metadata_id: ExternalMetadataId::new(external_metadata_id)?, + }) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn fixed_dt() -> NaiveDateTime { + chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc() + } + + fn review_logged() -> DomainEvent { + DomainEvent::ReviewLogged { + review_id: ReviewId::from_uuid(Uuid::new_v4()), + movie_id: MovieId::from_uuid(Uuid::new_v4()), + user_id: UserId::from_uuid(Uuid::new_v4()), + rating: Rating::new(4).unwrap(), + watched_at: fixed_dt(), + } + } + + fn review_updated() -> DomainEvent { + DomainEvent::ReviewUpdated { + review_id: ReviewId::from_uuid(Uuid::new_v4()), + movie_id: MovieId::from_uuid(Uuid::new_v4()), + user_id: UserId::from_uuid(Uuid::new_v4()), + rating: Rating::new(3).unwrap(), + watched_at: fixed_dt(), + } + } + + fn movie_discovered() -> DomainEvent { + DomainEvent::MovieDiscovered { + movie_id: MovieId::from_uuid(Uuid::new_v4()), + external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(), + } + } + + fn round_trip(event: DomainEvent) { + let payload = NatsEventPayload::from(&event); + let json = serde_json::to_string(&payload).expect("serialize"); + let back: NatsEventPayload = serde_json::from_str(&json).expect("deserialize"); + let recovered = DomainEvent::try_from(back).expect("try_from"); + assert_eq!(NatsEventPayload::from(&event), NatsEventPayload::from(&recovered)); + } + + #[test] + fn round_trip_review_logged() { + round_trip(review_logged()); + } + + #[test] + fn round_trip_review_updated() { + round_trip(review_updated()); + } + + #[test] + fn round_trip_movie_discovered() { + round_trip(movie_discovered()); + } + + #[test] + fn serialized_format_is_tagged() { + let payload = NatsEventPayload::from(&movie_discovered()); + let json = serde_json::to_string(&payload).unwrap(); + assert!(json.contains(r#""type":"MovieDiscovered""#)); + assert!(json.contains(r#""data":"#)); + } +} diff --git a/crates/adapters/nats/src/publisher.rs b/crates/adapters/nats/src/publisher.rs new file mode 100644 index 0000000..5825e2c --- /dev/null +++ b/crates/adapters/nats/src/publisher.rs @@ -0,0 +1,54 @@ +use async_nats::{jetstream, Client}; +use async_trait::async_trait; +use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; + +use crate::{payload::NatsEventPayload, subject::event_to_subject}; + +enum PublisherMode { + Core(Client), + JetStream(jetstream::Context), +} + +pub struct NatsEventPublisher { + mode: PublisherMode, + subject_prefix: String, +} + +impl NatsEventPublisher { + pub fn new_core(client: Client, subject_prefix: String) -> Self { + Self { mode: PublisherMode::Core(client), subject_prefix } + } + + pub fn new_jetstream(client: Client, subject_prefix: String) -> Self { + Self { mode: PublisherMode::JetStream(jetstream::new(client)), subject_prefix } + } +} + +#[async_trait] +impl EventPublisher for NatsEventPublisher { + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { + let subject = event_to_subject(&self.subject_prefix, event); + let payload = serde_json::to_vec(&NatsEventPayload::from(event)) + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + + match &self.mode { + PublisherMode::Core(client) => client + .publish(subject, payload.into()) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string())), + + PublisherMode::JetStream(js) => js + .publish(subject, payload.into()) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))? + .await + .map(|_| ()) + .map_err(|e| DomainError::InfrastructureError(e.to_string())), + } + } +} + +fn _assert_send_sync() { + fn check() {} + check::(); +} diff --git a/crates/adapters/nats/src/subject.rs b/crates/adapters/nats/src/subject.rs new file mode 100644 index 0000000..2deafca --- /dev/null +++ b/crates/adapters/nats/src/subject.rs @@ -0,0 +1,76 @@ +use domain::events::DomainEvent; + +pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String { + let suffix = match event { + DomainEvent::ReviewLogged { .. } => "review.logged", + DomainEvent::ReviewUpdated { .. } => "review.updated", + DomainEvent::MovieDiscovered { .. } => "movie.discovered", + }; + format!("{prefix}.{suffix}") +} + +pub fn consumer_subject_filter(prefix: &str) -> String { + format!("{prefix}.>") +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::NaiveDateTime; + use domain::value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId}; + use uuid::Uuid; + + fn dt() -> NaiveDateTime { + chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc() + } + + #[test] + fn review_logged_subject() { + let event = DomainEvent::ReviewLogged { + review_id: ReviewId::from_uuid(Uuid::new_v4()), + movie_id: MovieId::from_uuid(Uuid::new_v4()), + user_id: UserId::from_uuid(Uuid::new_v4()), + rating: Rating::new(3).unwrap(), + watched_at: dt(), + }; + assert_eq!( + event_to_subject("movies-diary.events", &event), + "movies-diary.events.review.logged" + ); + } + + #[test] + fn review_updated_subject() { + let event = DomainEvent::ReviewUpdated { + review_id: ReviewId::from_uuid(Uuid::new_v4()), + movie_id: MovieId::from_uuid(Uuid::new_v4()), + user_id: UserId::from_uuid(Uuid::new_v4()), + rating: Rating::new(5).unwrap(), + watched_at: dt(), + }; + assert_eq!( + event_to_subject("movies-diary.events", &event), + "movies-diary.events.review.updated" + ); + } + + #[test] + fn movie_discovered_subject() { + let event = DomainEvent::MovieDiscovered { + movie_id: MovieId::from_uuid(Uuid::new_v4()), + external_metadata_id: ExternalMetadataId::new("tt0000001".into()).unwrap(), + }; + assert_eq!( + event_to_subject("movies-diary.events", &event), + "movies-diary.events.movie.discovered" + ); + } + + #[test] + fn consumer_subject_filter_appends_wildcard() { + assert_eq!( + consumer_subject_filter("movies-diary.events"), + "movies-diary.events.>" + ); + } +} diff --git a/crates/adapters/postgres-event-queue/Cargo.toml b/crates/adapters/postgres-event-queue/Cargo.toml new file mode 100644 index 0000000..5c34a2c --- /dev/null +++ b/crates/adapters/postgres-event-queue/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "postgres-event-queue" +version = "0.1.0" +edition = "2024" + +[dependencies] diff --git a/crates/adapters/postgres-event-queue/src/lib.rs b/crates/adapters/postgres-event-queue/src/lib.rs new file mode 100644 index 0000000..b93cf3f --- /dev/null +++ b/crates/adapters/postgres-event-queue/src/lib.rs @@ -0,0 +1,14 @@ +pub fn add(left: u64, right: u64) -> u64 { + left + right +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +} diff --git a/crates/adapters/sqlite-event-queue/Cargo.toml b/crates/adapters/sqlite-event-queue/Cargo.toml new file mode 100644 index 0000000..fef4d4a --- /dev/null +++ b/crates/adapters/sqlite-event-queue/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "sqlite-event-queue" +version = "0.1.0" +edition = "2024" + +[dependencies] diff --git a/crates/adapters/sqlite-event-queue/src/lib.rs b/crates/adapters/sqlite-event-queue/src/lib.rs new file mode 100644 index 0000000..b93cf3f --- /dev/null +++ b/crates/adapters/sqlite-event-queue/src/lib.rs @@ -0,0 +1,14 @@ +pub fn add(left: u64, right: u64) -> u64 { + left + right +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +} diff --git a/crates/application/src/worker.rs b/crates/application/src/worker.rs index 01e1927..050fd55 100644 --- a/crates/application/src/worker.rs +++ b/crates/application/src/worker.rs @@ -20,7 +20,10 @@ impl WorkerService { let mut stream = self.consumer.consume(); while let Some(result) = stream.next().await { match result { - Ok(envelope) => self.dispatch(envelope).await, + Ok(envelope) => { + tracing::info!(event = ?envelope.event, "received event"); + self.dispatch(envelope).await; + } Err(e) => tracing::error!("event consumer error: {e}"), } } diff --git a/crates/presentation/Cargo.toml b/crates/presentation/Cargo.toml index 2abf1ff..8ffbb92 100644 --- a/crates/presentation/Cargo.toml +++ b/crates/presentation/Cargo.toml @@ -9,8 +9,18 @@ sqlite = ["dep:sqlite"] postgres = ["dep:postgres"] # Meta-feature: true when any federation adapter is active — keeps all #[cfg(feature = "federation")] gates working federation = [] -sqlite-federation = ["sqlite", "dep:sqlite-federation", "dep:activitypub", "federation"] -postgres-federation = ["postgres", "dep:postgres-federation", "dep:activitypub", "federation"] +sqlite-federation = [ + "sqlite", + "dep:sqlite-federation", + "dep:activitypub", + "federation", +] +postgres-federation = [ + "postgres", + "dep:postgres-federation", + "dep:activitypub", + "federation", +] [dependencies] tower-http = { version = "0.6.8", features = ["fs", "trace", "tracing"] } @@ -39,6 +49,7 @@ poster-fetcher = { workspace = true } poster-storage = { workspace = true } template-askama = { workspace = true } event-publisher = { workspace = true } +nats = { workspace = true } rss = { workspace = true } export = { workspace = true } doc = { workspace = true } @@ -46,12 +57,12 @@ sqlx = { workspace = true } utoipa = { version = "5.5.0", features = ["axum_extras", "uuid"] } # Optional — database backends -sqlite = { workspace = true, optional = true } -postgres = { workspace = true, optional = true } +sqlite = { workspace = true, optional = true } +postgres = { workspace = true, optional = true } # Optional — federation -activitypub = { workspace = true, optional = true } -sqlite-federation = { workspace = true, optional = true } +activitypub = { workspace = true, optional = true } +sqlite-federation = { workspace = true, optional = true } postgres-federation = { workspace = true, optional = true } [dev-dependencies] diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index 9504581..a74af36 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -38,9 +38,9 @@ use presentation::{openapi::ApiDoc, routes, state::AppState}; use utoipa::OpenApi as _; use domain::ports::{ - AuthService, DiaryExporter, DiaryRepository, MetadataClient, MovieRepository, - PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository, - UserRepository, + AuthService, DiaryExporter, DiaryRepository, EventHandler, EventPublisher, MetadataClient, + MovieRepository, PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, + StatsRepository, UserRepository, }; #[cfg(not(any(feature = "sqlite", feature = "postgres")))] @@ -184,26 +184,18 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { ); let ap_service_arc: Arc = concrete_ap_service; - let poster_handler = Arc::new(PosterSyncHandler::new(handler_ctx, 3)); - let (event_publisher, consumer) = create_event_channel(EventPublisherConfig::from_env()); - let worker = WorkerService::new( - Arc::new(consumer), - vec![poster_handler, Arc::new(ap_event_handler)], - ); - tokio::spawn(worker.run()); - - let ep: Arc = Arc::new(event_publisher); + let ep = build_event_publisher( + handler_ctx, + vec![Arc::new(ap_event_handler) as Arc], + ).await?; (ep, ap_router, ap_service_arc, social_query_arc) }; #[cfg(not(feature = "federation"))] - let (event_publisher_arc, ap_router): (Arc, axum::Router) = { - let poster_handler = Arc::new(PosterSyncHandler::new(handler_ctx, 3)); - let (event_publisher, consumer) = create_event_channel(EventPublisherConfig::from_env()); - let worker = WorkerService::new(Arc::new(consumer), vec![poster_handler]); - tokio::spawn(worker.run()); - (Arc::new(event_publisher), axum::Router::new()) - }; + let (event_publisher_arc, ap_router): (Arc, axum::Router) = ( + build_event_publisher(handler_ctx, vec![]).await?, + axum::Router::new(), + ); let app_ctx = AppContext { movie_repository, @@ -302,6 +294,23 @@ async fn wire_postgres(database_url: &str) -> anyhow::Result<( Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository)) } +async fn build_event_publisher( + handler_ctx: AppContext, + extra_handlers: Vec>, +) -> anyhow::Result> { + if let Ok(cfg) = nats::NatsConfig::from_env() { + tracing::info!("event bus: NATS ({})", cfg.url); + return nats::create_publisher(cfg).await; + } + tracing::info!("event bus: in-memory"); + let poster_handler = Arc::new(PosterSyncHandler::new(handler_ctx, 3)); + let mut handlers: Vec> = vec![poster_handler]; + handlers.extend(extra_handlers); + let (publisher, consumer) = create_event_channel(EventPublisherConfig::from_env()); + tokio::spawn(WorkerService::new(Arc::new(consumer), handlers).run()); + Ok(Arc::new(publisher)) +} + fn init_tracing() { tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 8d93b08..6a9eec3 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -29,8 +29,9 @@ metadata = { workspace = true } poster-fetcher = { workspace = true } poster-storage = { workspace = true } export = { workspace = true } +nats = { workspace = true } sqlx = { workspace = true } # Optional — database backends -sqlite = { workspace = true, optional = true } +sqlite = { workspace = true, optional = true } postgres = { workspace = true, optional = true } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 37a2a17..753e61f 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -70,9 +70,22 @@ async fn main() -> anyhow::Result<()> { _ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"), }; - let (event_publisher_arc, consumer) = { - let (publisher, consumer) = create_event_channel(EventPublisherConfig::from_env()); - (Arc::new(publisher) as Arc, consumer) + let (event_publisher_arc, consumer_arc): ( + Arc, + Arc, + ) = match nats::NatsConfig::from_env() { + Ok(cfg) => { + tracing::info!("event bus: NATS ({})", cfg.url); + nats::create_channel(cfg).await? + } + Err(_) => { + tracing::info!("event bus: in-memory channel (NATS_URL not set)"); + let (publisher, consumer) = create_event_channel(EventPublisherConfig::from_env()); + ( + Arc::new(publisher) as Arc, + Arc::new(consumer) as Arc, + ) + } }; let ctx = AppContext { @@ -92,7 +105,7 @@ async fn main() -> anyhow::Result<()> { }; let poster_handler = Arc::new(PosterSyncHandler::new(ctx, 3)); - let worker = WorkerService::new(Arc::new(consumer), vec![poster_handler]); + let worker = WorkerService::new(consumer_arc, vec![poster_handler]); tracing::info!("worker started"); worker.run().await; @@ -102,9 +115,10 @@ async fn main() -> anyhow::Result<()> { } fn init_tracing() { + let filter = std::env::var("RUST_LOG") + .unwrap_or_else(|_| "worker=info,application=info".to_string()); tracing_subscriber::registry() - .with(tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "info".into())) + .with(tracing_subscriber::EnvFilter::new(filter)) .with(tracing_subscriber::fmt::layer()) .init(); }