Compare commits
6 Commits
3342905e22
...
660a8d618d
| Author | SHA1 | Date | |
|---|---|---|---|
| 660a8d618d | |||
| 126ab43287 | |||
| 44273457ae | |||
| 4b9fd8168d | |||
| d9613308a3 | |||
| 1f5c05a5a2 |
22
.env.example
22
.env.example
@@ -38,5 +38,23 @@ SECURE_COOKIES=false
|
||||
ALLOW_REGISTRATION=false
|
||||
RATE_LIMIT=20
|
||||
POSTER_FETCH_TIMEOUT_SECONDS=30
|
||||
EVENT_CHANNEL_BUFFER=128
|
||||
RUST_LOG=presentation=debug,tower_http=debug
|
||||
|
||||
# Event bus — "db" (default) or "nats"
|
||||
# The worker binary must run alongside the presentation to process events.
|
||||
EVENT_BUS_BACKEND=db
|
||||
|
||||
# Option A: DB queue (default — no extra infrastructure needed)
|
||||
# Events are persisted in the same database as the app and polled by the worker.
|
||||
# EVENT_QUEUE_POLL_INTERVAL_MS=500 # polling interval (default: 500ms)
|
||||
# EVENT_QUEUE_BATCH_SIZE=10 # rows claimed per poll cycle (default: 10)
|
||||
# EVENT_QUEUE_MAX_ATTEMPTS=5 # retries before dead-lettering (default: 5)
|
||||
|
||||
# Option B: NATS (at-least-once delivery, recommended for higher throughput)
|
||||
# EVENT_BUS_BACKEND=nats
|
||||
# NATS_URL=nats://localhost:4222
|
||||
# NATS_MODE=jetstream # "jetstream" (default, at-least-once) or "core" (fire-and-forget)
|
||||
# NATS_SUBJECT_PREFIX=movies-diary.events
|
||||
# NATS_STREAM_NAME=MOVIES_DIARY_EVENTS
|
||||
# NATS_CONSUMER_NAME=worker
|
||||
|
||||
RUST_LOG=presentation=debug,tower_http=debug,worker=info,application=info
|
||||
|
||||
300
Cargo.lock
generated
300
Cargo.lock
generated
@@ -254,7 +254,7 @@ checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cipher",
|
||||
"cpufeatures",
|
||||
"cpufeatures 0.2.17",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -328,7 +328,7 @@ checksum = "3c3610892ee6e0cbce8ae2700349fcf8f98adb0dbfbee85aec3c9179d29cc072"
|
||||
dependencies = [
|
||||
"base64ct",
|
||||
"blake2",
|
||||
"cpufeatures",
|
||||
"cpufeatures 0.2.17",
|
||||
"password-hash",
|
||||
]
|
||||
|
||||
@@ -452,6 +452,42 @@ dependencies = [
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-nats"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "31811585c7c5bc2f60f8b80d5a6b0f737115611dac47567d7f7d94562ebb180b"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"memchr",
|
||||
"nkeys",
|
||||
"nuid",
|
||||
"pin-project",
|
||||
"portable-atomic",
|
||||
"rand 0.10.1",
|
||||
"regex",
|
||||
"ring",
|
||||
"rustls-native-certs",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_nanos",
|
||||
"serde_repr",
|
||||
"thiserror 2.0.18",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tokio-websockets",
|
||||
"tracing",
|
||||
"tryhard",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-process"
|
||||
version = "2.5.0"
|
||||
@@ -794,6 +830,9 @@ name = "bytes"
|
||||
version = "1.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bytestring"
|
||||
@@ -857,6 +896,17 @@ version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
|
||||
|
||||
[[package]]
|
||||
name = "chacha20"
|
||||
version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cpufeatures 0.3.0",
|
||||
"rand_core 0.10.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "chrono"
|
||||
version = "0.4.44"
|
||||
@@ -973,6 +1023,15 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cpufeatures"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crc"
|
||||
version = "3.4.0"
|
||||
@@ -1098,6 +1157,32 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "curve25519-dalek"
|
||||
version = "4.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cpufeatures 0.2.17",
|
||||
"curve25519-dalek-derive",
|
||||
"digest",
|
||||
"fiat-crypto",
|
||||
"rustc_version",
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "curve25519-dalek-derive"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.117",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "darling"
|
||||
version = "0.20.11"
|
||||
@@ -1181,6 +1266,12 @@ dependencies = [
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "data-encoding"
|
||||
version = "2.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a4ae5f15dda3c708c0ade84bfee31ccab44a3da4f88015ed22f63732abe300c8"
|
||||
|
||||
[[package]]
|
||||
name = "deltae"
|
||||
version = "0.3.2"
|
||||
@@ -1205,6 +1296,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c"
|
||||
dependencies = [
|
||||
"powerfmt",
|
||||
"serde_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1376,6 +1468,28 @@ version = "1.0.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555"
|
||||
|
||||
[[package]]
|
||||
name = "ed25519"
|
||||
version = "2.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53"
|
||||
dependencies = [
|
||||
"signature",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ed25519-dalek"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9"
|
||||
dependencies = [
|
||||
"curve25519-dalek",
|
||||
"ed25519",
|
||||
"sha2",
|
||||
"signature",
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "either"
|
||||
version = "1.15.0"
|
||||
@@ -1550,6 +1664,12 @@ version = "2.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6"
|
||||
|
||||
[[package]]
|
||||
name = "fiat-crypto"
|
||||
version = "0.2.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d"
|
||||
|
||||
[[package]]
|
||||
name = "filedescriptor"
|
||||
version = "0.8.3"
|
||||
@@ -1807,6 +1927,7 @@ dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"r-efi 6.0.0",
|
||||
"rand_core 0.10.1",
|
||||
"wasip2",
|
||||
"wasip3",
|
||||
]
|
||||
@@ -2699,6 +2820,24 @@ dependencies = [
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nats"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-nats",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"domain",
|
||||
"futures",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "never"
|
||||
version = "0.1.0"
|
||||
@@ -2718,6 +2857,21 @@ dependencies = [
|
||||
"memoffset",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nkeys"
|
||||
version = "0.4.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "879011babc47a1c7fdf5a935ae3cfe94f34645ca0cac1c7f6424b36fc743d1bf"
|
||||
dependencies = [
|
||||
"data-encoding",
|
||||
"ed25519",
|
||||
"ed25519-dalek",
|
||||
"getrandom 0.2.17",
|
||||
"log",
|
||||
"rand 0.8.6",
|
||||
"signatory",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "7.1.3"
|
||||
@@ -2749,6 +2903,15 @@ dependencies = [
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nuid"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83"
|
||||
dependencies = [
|
||||
"rand 0.8.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num"
|
||||
version = "0.4.3"
|
||||
@@ -3092,6 +3255,26 @@ dependencies = [
|
||||
"siphasher",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project"
|
||||
version = "1.1.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cbf0d9e68100b3a7989b4901972f265cd542e560a3a8a724e1e20322f4d06ce9"
|
||||
dependencies = [
|
||||
"pin-project-internal",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-internal"
|
||||
version = "1.1.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a990e22f43e84855daf260dded30524ef4a9021cc7541c26540500a50b624389"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.117",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-lite"
|
||||
version = "0.2.17"
|
||||
@@ -3199,6 +3382,23 @@ dependencies = [
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-event-queue"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"domain",
|
||||
"futures",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-federation"
|
||||
version = "0.1.0"
|
||||
@@ -3259,15 +3459,18 @@ dependencies = [
|
||||
"http-body-util",
|
||||
"infer",
|
||||
"metadata",
|
||||
"nats",
|
||||
"percent-encoding",
|
||||
"poster-fetcher",
|
||||
"poster-storage",
|
||||
"postgres",
|
||||
"postgres-event-queue",
|
||||
"postgres-federation",
|
||||
"rss 0.1.0",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlite",
|
||||
"sqlite-event-queue",
|
||||
"sqlite-federation",
|
||||
"sqlx",
|
||||
"template-askama",
|
||||
@@ -3418,6 +3621,17 @@ dependencies = [
|
||||
"rand_core 0.9.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207"
|
||||
dependencies = [
|
||||
"chacha20",
|
||||
"getrandom 0.4.2",
|
||||
"rand_core 0.10.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_chacha"
|
||||
version = "0.3.1"
|
||||
@@ -3456,6 +3670,12 @@ dependencies = [
|
||||
"getrandom 0.3.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69"
|
||||
|
||||
[[package]]
|
||||
name = "ratatui"
|
||||
version = "0.30.0"
|
||||
@@ -4027,6 +4247,15 @@ dependencies = [
|
||||
"zmij",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_nanos"
|
||||
version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_path_to_error"
|
||||
version = "0.1.20"
|
||||
@@ -4068,7 +4297,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cpufeatures",
|
||||
"cpufeatures 0.2.17",
|
||||
"digest",
|
||||
]
|
||||
|
||||
@@ -4085,7 +4314,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cpufeatures",
|
||||
"cpufeatures 0.2.17",
|
||||
"digest",
|
||||
]
|
||||
|
||||
@@ -4135,6 +4364,18 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signatory"
|
||||
version = "0.27.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31"
|
||||
dependencies = [
|
||||
"pkcs8",
|
||||
"rand_core 0.6.4",
|
||||
"signature",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signature"
|
||||
version = "2.2.0"
|
||||
@@ -4283,6 +4524,23 @@ dependencies = [
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sqlite-event-queue"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"domain",
|
||||
"futures",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sqlite-federation"
|
||||
version = "0.1.0"
|
||||
@@ -4880,6 +5138,27 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-websockets"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f591660438b3038dd04d16c938271c79e7e06260ad2ea2885a4861bfb238605d"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"http 1.4.0",
|
||||
"httparse",
|
||||
"rand 0.8.6",
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tokio-util",
|
||||
"webpki-roots 0.26.11",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml_datetime"
|
||||
version = "1.1.1+spec-1.1.0"
|
||||
@@ -5034,6 +5313,16 @@ version = "0.2.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
|
||||
|
||||
[[package]]
|
||||
name = "tryhard"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9fe58ebd5edd976e0fe0f8a14d2a04b7c81ef153ea9a54eebc42e67c2c23b4e5"
|
||||
dependencies = [
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tui"
|
||||
version = "0.1.0"
|
||||
@@ -5956,12 +6245,15 @@ dependencies = [
|
||||
"export",
|
||||
"futures",
|
||||
"metadata",
|
||||
"nats",
|
||||
"poster-fetcher",
|
||||
"poster-storage",
|
||||
"postgres",
|
||||
"postgres-event-queue",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlite",
|
||||
"sqlite-event-queue",
|
||||
"sqlx",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
|
||||
@@ -10,10 +10,13 @@ members = [
|
||||
"crates/adapters/postgres",
|
||||
"crates/adapters/sqlite-federation",
|
||||
"crates/adapters/postgres-federation",
|
||||
"crates/adapters/sqlite-event-queue",
|
||||
"crates/adapters/postgres-event-queue",
|
||||
"crates/adapters/template-askama",
|
||||
"crates/adapters/activitypub",
|
||||
"crates/adapters/activitypub-base",
|
||||
"crates/adapters/export",
|
||||
"crates/adapters/nats",
|
||||
"crates/application",
|
||||
"crates/domain",
|
||||
"crates/presentation",
|
||||
@@ -64,3 +67,6 @@ template-askama = { path = "crates/adapters/template-askama" }
|
||||
activitypub = { path = "crates/adapters/activitypub" }
|
||||
activitypub-base = { path = "crates/adapters/activitypub-base" }
|
||||
doc = { path = "crates/doc" }
|
||||
nats = { path = "crates/adapters/nats" }
|
||||
sqlite-event-queue = { path = "crates/adapters/sqlite-event-queue" }
|
||||
postgres-event-queue = { path = "crates/adapters/postgres-event-queue" }
|
||||
|
||||
11
Dockerfile
11
Dockerfile
@@ -11,15 +11,18 @@ COPY crates/adapters/activitypub/Cargo.toml crates/adapters/activitypub/Ca
|
||||
COPY crates/adapters/activitypub-base/Cargo.toml crates/adapters/activitypub-base/Cargo.toml
|
||||
COPY crates/adapters/auth/Cargo.toml crates/adapters/auth/Cargo.toml
|
||||
COPY crates/adapters/event-publisher/Cargo.toml crates/adapters/event-publisher/Cargo.toml
|
||||
COPY crates/adapters/nats/Cargo.toml crates/adapters/nats/Cargo.toml
|
||||
COPY crates/adapters/metadata/Cargo.toml crates/adapters/metadata/Cargo.toml
|
||||
COPY crates/adapters/poster-fetcher/Cargo.toml crates/adapters/poster-fetcher/Cargo.toml
|
||||
COPY crates/adapters/poster-storage/Cargo.toml crates/adapters/poster-storage/Cargo.toml
|
||||
COPY crates/adapters/export/Cargo.toml crates/adapters/export/Cargo.toml
|
||||
COPY crates/adapters/rss/Cargo.toml crates/adapters/rss/Cargo.toml
|
||||
COPY crates/adapters/sqlite/Cargo.toml crates/adapters/sqlite/Cargo.toml
|
||||
COPY crates/adapters/sqlite-federation/Cargo.toml crates/adapters/sqlite-federation/Cargo.toml
|
||||
COPY crates/adapters/postgres/Cargo.toml crates/adapters/postgres/Cargo.toml
|
||||
COPY crates/adapters/sqlite/Cargo.toml crates/adapters/sqlite/Cargo.toml
|
||||
COPY crates/adapters/sqlite-federation/Cargo.toml crates/adapters/sqlite-federation/Cargo.toml
|
||||
COPY crates/adapters/sqlite-event-queue/Cargo.toml crates/adapters/sqlite-event-queue/Cargo.toml
|
||||
COPY crates/adapters/postgres/Cargo.toml crates/adapters/postgres/Cargo.toml
|
||||
COPY crates/adapters/postgres-federation/Cargo.toml crates/adapters/postgres-federation/Cargo.toml
|
||||
COPY crates/adapters/postgres-event-queue/Cargo.toml crates/adapters/postgres-event-queue/Cargo.toml
|
||||
COPY crates/adapters/template-askama/Cargo.toml crates/adapters/template-askama/Cargo.toml
|
||||
COPY crates/application/Cargo.toml crates/application/Cargo.toml
|
||||
COPY crates/domain/Cargo.toml crates/domain/Cargo.toml
|
||||
@@ -42,6 +45,8 @@ COPY crates ./crates
|
||||
#
|
||||
# To build with PostgreSQL backend instead:
|
||||
# --build-arg FEATURES=postgres,postgres-federation
|
||||
# To add NATS support (EVENT_BUS_BACKEND=nats):
|
||||
# --build-arg FEATURES=sqlite,sqlite-federation,nats
|
||||
ARG FEATURES=sqlite,sqlite-federation
|
||||
RUN cargo build --release -p presentation -p worker --no-default-features --features "${FEATURES}"
|
||||
|
||||
|
||||
27
README.md
27
README.md
@@ -34,10 +34,14 @@ adapters/
|
||||
template-askama — Askama HTML rendering
|
||||
rss — RSS/Atom feed generation
|
||||
export — CSV and JSON diary serialization
|
||||
event-publisher — async event channel for background poster sync
|
||||
activitypub — ActivityPub federation (follow, inbox/outbox, actor)
|
||||
activitypub-base — core ActivityPub types and repository traits
|
||||
sqlite-event-queue — polling event queue backed by SQLite (DB-queue mode)
|
||||
postgres-event-queue — polling event queue backed by PostgreSQL (DB-queue mode)
|
||||
nats — NATS Core / JetStream event publisher and consumer
|
||||
event-publisher — in-memory event channel (used in tests)
|
||||
activitypub — ActivityPub federation (follow, inbox/outbox, actor)
|
||||
activitypub-base — core ActivityPub types and repository traits
|
||||
doc — OpenAPI spec assembly and Swagger UI / Scalar serving
|
||||
worker — standalone worker binary (polls the event queue, syncs posters)
|
||||
tui — terminal UI client (ratatui)
|
||||
```
|
||||
|
||||
@@ -85,16 +89,27 @@ PORT=3000
|
||||
RATE_LIMIT=60 # requests per minute per IP (default: 60)
|
||||
ALLOW_REGISTRATION=true # set to false to disable new sign-ups
|
||||
SECURE_COOKIES=true # set when serving over HTTPS
|
||||
RUST_LOG=presentation=info,tower_http=info
|
||||
RUST_LOG=presentation=info,tower_http=info,worker=info,application=info
|
||||
|
||||
# Event bus — "db" (default, uses same database) or "nats"
|
||||
EVENT_BUS_BACKEND=db
|
||||
# NATS_URL=nats://localhost:4222 # required when EVENT_BUS_BACKEND=nats
|
||||
```
|
||||
|
||||
The `worker` binary must run alongside `presentation` to process events:
|
||||
|
||||
```bash
|
||||
cargo run -p worker
|
||||
```
|
||||
|
||||
## Run
|
||||
|
||||
```bash
|
||||
cargo run -p presentation
|
||||
cargo run -p presentation # HTTP server (0.0.0.0:3000)
|
||||
cargo run -p worker # event worker (poster sync, in a separate terminal)
|
||||
```
|
||||
|
||||
Server listens on `0.0.0.0:3000` by default.
|
||||
The worker polls the event queue and must run alongside the presentation to process background tasks like poster fetching. Both processes share the same database.
|
||||
|
||||
## API
|
||||
|
||||
|
||||
19
crates/adapters/nats/Cargo.toml
Normal file
19
crates/adapters/nats/Cargo.toml
Normal file
@@ -0,0 +1,19 @@
|
||||
[package]
|
||||
name = "nats"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
async-nats = "0.48.0"
|
||||
|
||||
domain = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
101
crates/adapters/nats/src/config.rs
Normal file
101
crates/adapters/nats/src/config.rs
Normal file
@@ -0,0 +1,101 @@
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum NatsMode {
|
||||
Core,
|
||||
JetStream,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NatsConfig {
|
||||
pub url: String,
|
||||
pub mode: NatsMode,
|
||||
pub subject_prefix: String,
|
||||
pub stream_name: String,
|
||||
pub consumer_name: String,
|
||||
}
|
||||
|
||||
impl NatsConfig {
|
||||
pub fn from_env() -> anyhow::Result<Self> {
|
||||
let url = std::env::var("NATS_URL")
|
||||
.map_err(|_| anyhow::anyhow!("NATS_URL is not set"))?;
|
||||
|
||||
let mode = match std::env::var("NATS_MODE")
|
||||
.unwrap_or_else(|_| "jetstream".to_string())
|
||||
.as_str()
|
||||
{
|
||||
"core" => NatsMode::Core,
|
||||
"jetstream" => NatsMode::JetStream,
|
||||
other => anyhow::bail!("unknown NATS_MODE: {other}"),
|
||||
};
|
||||
|
||||
let subject_prefix = std::env::var("NATS_SUBJECT_PREFIX")
|
||||
.unwrap_or_else(|_| "movies-diary.events".to_string());
|
||||
let stream_name = std::env::var("NATS_STREAM_NAME")
|
||||
.unwrap_or_else(|_| "MOVIES_DIARY_EVENTS".to_string());
|
||||
let consumer_name = std::env::var("NATS_CONSUMER_NAME")
|
||||
.unwrap_or_else(|_| "worker".to_string());
|
||||
|
||||
Ok(Self { url, mode, subject_prefix, stream_name, consumer_name })
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn errors_without_nats_url() {
|
||||
unsafe { std::env::remove_var("NATS_URL"); }
|
||||
assert!(NatsConfig::from_env().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn defaults_with_only_url() {
|
||||
unsafe {
|
||||
std::env::set_var("NATS_URL", "nats://localhost:4222");
|
||||
std::env::remove_var("NATS_MODE");
|
||||
std::env::remove_var("NATS_SUBJECT_PREFIX");
|
||||
std::env::remove_var("NATS_STREAM_NAME");
|
||||
std::env::remove_var("NATS_CONSUMER_NAME");
|
||||
}
|
||||
|
||||
let cfg = NatsConfig::from_env().unwrap();
|
||||
assert_eq!(cfg.url, "nats://localhost:4222");
|
||||
assert_eq!(cfg.mode, NatsMode::JetStream);
|
||||
assert_eq!(cfg.subject_prefix, "movies-diary.events");
|
||||
assert_eq!(cfg.stream_name, "MOVIES_DIARY_EVENTS");
|
||||
assert_eq!(cfg.consumer_name, "worker");
|
||||
|
||||
unsafe { std::env::remove_var("NATS_URL"); }
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn core_mode_parsed() {
|
||||
unsafe {
|
||||
std::env::set_var("NATS_URL", "nats://test:4222");
|
||||
std::env::set_var("NATS_MODE", "core");
|
||||
}
|
||||
|
||||
let cfg = NatsConfig::from_env().unwrap();
|
||||
assert_eq!(cfg.mode, NatsMode::Core);
|
||||
|
||||
unsafe {
|
||||
std::env::remove_var("NATS_URL");
|
||||
std::env::remove_var("NATS_MODE");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_mode_errors() {
|
||||
unsafe {
|
||||
std::env::set_var("NATS_URL", "nats://test:4222");
|
||||
std::env::set_var("NATS_MODE", "kafka");
|
||||
}
|
||||
|
||||
assert!(NatsConfig::from_env().is_err());
|
||||
|
||||
unsafe {
|
||||
std::env::remove_var("NATS_URL");
|
||||
std::env::remove_var("NATS_MODE");
|
||||
}
|
||||
}
|
||||
}
|
||||
213
crates/adapters/nats/src/consumer.rs
Normal file
213
crates/adapters/nats/src/consumer.rs
Normal file
@@ -0,0 +1,213 @@
|
||||
use async_nats::{
|
||||
Client,
|
||||
jetstream::{self, consumer::pull, message::AckKind, stream::Config as StreamConfig},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::{AckHandle, DomainEvent, EventEnvelope},
|
||||
ports::EventConsumer,
|
||||
};
|
||||
use futures::{
|
||||
StreamExt,
|
||||
stream::{self, BoxStream},
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{Mutex, mpsc};
|
||||
|
||||
use crate::{config::NatsConfig, payload::NatsEventPayload, subject::consumer_subject_filter};
|
||||
|
||||
// ── JetStream ack handle ─────────────────────────────────────────────────────
|
||||
|
||||
struct NatsJetStreamAckHandle {
|
||||
message: async_nats::jetstream::Message,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AckHandle for NatsJetStreamAckHandle {
|
||||
async fn ack(&self) -> Result<(), DomainError> {
|
||||
tracing::debug!(
|
||||
"acknowledging message with sequence {}",
|
||||
self.message.info().unwrap().stream_sequence
|
||||
);
|
||||
|
||||
self.message
|
||||
.ack()
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
||||
}
|
||||
|
||||
async fn nack(&self) -> Result<(), DomainError> {
|
||||
tracing::debug!(
|
||||
"negatively acknowledging message with sequence {}",
|
||||
self.message.info().unwrap().stream_sequence
|
||||
);
|
||||
|
||||
self.message
|
||||
.ack_with(AckKind::Nak(None))
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
// ── Core NATS ack handle (no-op) ─────────────────────────────────────────────
|
||||
|
||||
struct NoopAck;
|
||||
|
||||
#[async_trait]
|
||||
impl AckHandle for NoopAck {
|
||||
async fn ack(&self) -> Result<(), DomainError> {
|
||||
Ok(())
|
||||
}
|
||||
async fn nack(&self) -> Result<(), DomainError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ── Envelope construction helpers ────────────────────────────────────────────
|
||||
|
||||
fn decode_js(msg: async_nats::jetstream::Message) -> Result<EventEnvelope, DomainError> {
|
||||
let payload: NatsEventPayload = serde_json::from_slice(&msg.payload)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?;
|
||||
let event = DomainEvent::try_from(payload)?;
|
||||
Ok(EventEnvelope::new(
|
||||
event,
|
||||
Box::new(NatsJetStreamAckHandle { message: msg }),
|
||||
))
|
||||
}
|
||||
|
||||
fn decode_core(msg: async_nats::Message) -> Result<EventEnvelope, DomainError> {
|
||||
let payload: NatsEventPayload = serde_json::from_slice(&msg.payload)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?;
|
||||
let event = DomainEvent::try_from(payload)?;
|
||||
Ok(EventEnvelope::new(event, Box::new(NoopAck)))
|
||||
}
|
||||
|
||||
// ── Channel-bridge shared by both consumers ──────────────────────────────────
|
||||
|
||||
type EnvelopeRx = Arc<Mutex<mpsc::Receiver<Result<EventEnvelope, DomainError>>>>;
|
||||
|
||||
fn consume_from_rx(rx: EnvelopeRx) -> BoxStream<'static, Result<EventEnvelope, DomainError>> {
|
||||
Box::pin(stream::unfold(rx, |rx| async move {
|
||||
let item = rx.lock().await.recv().await?;
|
||||
Some((item, rx))
|
||||
}))
|
||||
}
|
||||
|
||||
// ── JetStream consumer ────────────────────────────────────────────────────────
|
||||
|
||||
pub struct NatsJetStreamConsumer {
|
||||
rx: EnvelopeRx,
|
||||
}
|
||||
|
||||
impl NatsJetStreamConsumer {
|
||||
pub async fn create(cfg: &NatsConfig, client: Client) -> anyhow::Result<Self> {
|
||||
let js = jetstream::new(client);
|
||||
|
||||
let stream = js
|
||||
.get_or_create_stream(StreamConfig {
|
||||
name: cfg.stream_name.clone(),
|
||||
subjects: vec![consumer_subject_filter(&cfg.subject_prefix)],
|
||||
max_messages: 100_000,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
|
||||
let subject_filter = consumer_subject_filter(&cfg.subject_prefix);
|
||||
let consumer = stream
|
||||
.get_or_create_consumer(
|
||||
cfg.consumer_name.as_str(),
|
||||
pull::Config {
|
||||
durable_name: Some(cfg.consumer_name.clone()),
|
||||
filter_subject: subject_filter,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let (tx, rx) = mpsc::channel(128);
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let mut messages = match consumer.messages().await {
|
||||
Err(e) => {
|
||||
tracing::error!("failed to fetch messages: {}", e);
|
||||
|
||||
let _ = tx
|
||||
.send(Err(DomainError::InfrastructureError(e.to_string())))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Ok(m) => m,
|
||||
};
|
||||
while let Some(result) = messages.next().await {
|
||||
let envelope = result
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
||||
.and_then(decode_js);
|
||||
|
||||
if tx.send(envelope).await.is_err() {
|
||||
tracing::info!("consumer channel closed, stopping message processing");
|
||||
return;
|
||||
}
|
||||
|
||||
tracing::debug!("message sent to consumer channel");
|
||||
}
|
||||
// messages() stream ended (fetch expired in strict mode) — restart
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
rx: Arc::new(Mutex::new(rx)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl EventConsumer for NatsJetStreamConsumer {
|
||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||
consume_from_rx(Arc::clone(&self.rx))
|
||||
}
|
||||
}
|
||||
|
||||
// ── Core NATS consumer ────────────────────────────────────────────────────────
|
||||
|
||||
pub struct NatsCoreConsumer {
|
||||
rx: EnvelopeRx,
|
||||
}
|
||||
|
||||
impl NatsCoreConsumer {
|
||||
pub async fn create(cfg: &NatsConfig, client: Client) -> anyhow::Result<Self> {
|
||||
let subject = consumer_subject_filter(&cfg.subject_prefix);
|
||||
let mut subscriber = client.subscribe(subject).await?;
|
||||
|
||||
let (tx, rx) = mpsc::channel(128);
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(msg) = subscriber.next().await {
|
||||
let envelope = decode_core(msg);
|
||||
|
||||
tracing::debug!("message received and decoded, sending to consumer channel");
|
||||
|
||||
if tx.send(envelope).await.is_err() {
|
||||
tracing::info!("consumer channel closed, stopping message processing");
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
rx: Arc::new(Mutex::new(rx)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl EventConsumer for NatsCoreConsumer {
|
||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||
consume_from_rx(Arc::clone(&self.rx))
|
||||
}
|
||||
}
|
||||
|
||||
fn _assert_send_sync() {
|
||||
fn check<T: Send + Sync>() {}
|
||||
check::<NatsJetStreamConsumer>();
|
||||
check::<NatsCoreConsumer>();
|
||||
}
|
||||
52
crates/adapters/nats/src/lib.rs
Normal file
52
crates/adapters/nats/src/lib.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
mod config;
|
||||
mod consumer;
|
||||
mod payload;
|
||||
mod publisher;
|
||||
mod subject;
|
||||
|
||||
pub use config::{NatsConfig, NatsMode};
|
||||
pub use consumer::{NatsCoreConsumer, NatsJetStreamConsumer};
|
||||
pub use publisher::NatsEventPublisher;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use domain::ports::{EventConsumer, EventPublisher};
|
||||
|
||||
pub async fn create_publisher(cfg: NatsConfig) -> anyhow::Result<Arc<dyn EventPublisher>> {
|
||||
let client = async_nats::connect(&cfg.url).await?;
|
||||
let publisher: Arc<dyn EventPublisher> = match cfg.mode {
|
||||
NatsMode::Core => Arc::new(NatsEventPublisher::new_core(client, cfg.subject_prefix)),
|
||||
NatsMode::JetStream => Arc::new(NatsEventPublisher::new_jetstream(
|
||||
client,
|
||||
cfg.subject_prefix,
|
||||
)),
|
||||
};
|
||||
|
||||
tracing::info!("NATS publisher created (mode: {:?})", cfg.mode);
|
||||
Ok(publisher)
|
||||
}
|
||||
|
||||
pub async fn create_channel(
|
||||
cfg: NatsConfig,
|
||||
) -> anyhow::Result<(Arc<dyn EventPublisher>, Arc<dyn EventConsumer>)> {
|
||||
let client = async_nats::connect(&cfg.url).await?;
|
||||
|
||||
let publisher: Arc<dyn EventPublisher> = match cfg.mode {
|
||||
NatsMode::Core => Arc::new(NatsEventPublisher::new_core(
|
||||
client.clone(),
|
||||
cfg.subject_prefix.clone(),
|
||||
)),
|
||||
NatsMode::JetStream => Arc::new(NatsEventPublisher::new_jetstream(
|
||||
client.clone(),
|
||||
cfg.subject_prefix.clone(),
|
||||
)),
|
||||
};
|
||||
|
||||
let consumer: Arc<dyn EventConsumer> = match cfg.mode {
|
||||
NatsMode::Core => Arc::new(NatsCoreConsumer::create(&cfg, client).await?),
|
||||
NatsMode::JetStream => Arc::new(NatsJetStreamConsumer::create(&cfg, client).await?),
|
||||
};
|
||||
|
||||
tracing::info!("NATS channel created (mode: {:?})", cfg.mode);
|
||||
Ok((publisher, consumer))
|
||||
}
|
||||
172
crates/adapters/nats/src/payload.rs
Normal file
172
crates/adapters/nats/src/payload.rs
Normal file
@@ -0,0 +1,172 @@
|
||||
use chrono::NaiveDateTime;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(tag = "type", content = "data")]
|
||||
pub enum NatsEventPayload {
|
||||
ReviewLogged {
|
||||
review_id: String,
|
||||
movie_id: String,
|
||||
user_id: String,
|
||||
rating: u8,
|
||||
watched_at: i64,
|
||||
},
|
||||
ReviewUpdated {
|
||||
review_id: String,
|
||||
movie_id: String,
|
||||
user_id: String,
|
||||
rating: u8,
|
||||
watched_at: i64,
|
||||
},
|
||||
MovieDiscovered {
|
||||
movie_id: String,
|
||||
external_metadata_id: String,
|
||||
},
|
||||
}
|
||||
|
||||
fn parse_uuid(s: &str, field: &str) -> Result<Uuid, DomainError> {
|
||||
Uuid::parse_str(s)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}")))
|
||||
}
|
||||
|
||||
fn parse_ts(ts: i64) -> Result<NaiveDateTime, DomainError> {
|
||||
chrono::DateTime::from_timestamp(ts, 0)
|
||||
.map(|dt| dt.naive_utc())
|
||||
.ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}")))
|
||||
}
|
||||
|
||||
impl From<&DomainEvent> for NatsEventPayload {
|
||||
fn from(event: &DomainEvent) -> Self {
|
||||
match event {
|
||||
DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
NatsEventPayload::ReviewLogged {
|
||||
review_id: review_id.value().to_string(),
|
||||
movie_id: movie_id.value().to_string(),
|
||||
user_id: user_id.value().to_string(),
|
||||
rating: rating.value(),
|
||||
watched_at: watched_at.and_utc().timestamp(),
|
||||
}
|
||||
}
|
||||
DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
NatsEventPayload::ReviewUpdated {
|
||||
review_id: review_id.value().to_string(),
|
||||
movie_id: movie_id.value().to_string(),
|
||||
user_id: user_id.value().to_string(),
|
||||
rating: rating.value(),
|
||||
watched_at: watched_at.and_utc().timestamp(),
|
||||
}
|
||||
}
|
||||
DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||
NatsEventPayload::MovieDiscovered {
|
||||
movie_id: movie_id.value().to_string(),
|
||||
external_metadata_id: external_metadata_id.value().to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<NatsEventPayload> for DomainEvent {
|
||||
type Error = DomainError;
|
||||
fn try_from(payload: NatsEventPayload) -> Result<Self, DomainError> {
|
||||
match payload {
|
||||
NatsEventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
Ok(DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
rating: Rating::new(rating)?,
|
||||
watched_at: parse_ts(watched_at)?,
|
||||
})
|
||||
}
|
||||
NatsEventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
Ok(DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
rating: Rating::new(rating)?,
|
||||
watched_at: parse_ts(watched_at)?,
|
||||
})
|
||||
}
|
||||
NatsEventPayload::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||
Ok(DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
external_metadata_id: ExternalMetadataId::new(external_metadata_id)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn fixed_dt() -> NaiveDateTime {
|
||||
chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc()
|
||||
}
|
||||
|
||||
fn review_logged() -> DomainEvent {
|
||||
DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(4).unwrap(),
|
||||
watched_at: fixed_dt(),
|
||||
}
|
||||
}
|
||||
|
||||
fn review_updated() -> DomainEvent {
|
||||
DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(3).unwrap(),
|
||||
watched_at: fixed_dt(),
|
||||
}
|
||||
}
|
||||
|
||||
fn movie_discovered() -> DomainEvent {
|
||||
DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
fn round_trip(event: DomainEvent) {
|
||||
let payload = NatsEventPayload::from(&event);
|
||||
let json = serde_json::to_string(&payload).expect("serialize");
|
||||
let back: NatsEventPayload = serde_json::from_str(&json).expect("deserialize");
|
||||
let recovered = DomainEvent::try_from(back).expect("try_from");
|
||||
assert_eq!(NatsEventPayload::from(&event), NatsEventPayload::from(&recovered));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_review_logged() {
|
||||
round_trip(review_logged());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_review_updated() {
|
||||
round_trip(review_updated());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_movie_discovered() {
|
||||
round_trip(movie_discovered());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialized_format_is_tagged() {
|
||||
let payload = NatsEventPayload::from(&movie_discovered());
|
||||
let json = serde_json::to_string(&payload).unwrap();
|
||||
assert!(json.contains(r#""type":"MovieDiscovered""#));
|
||||
assert!(json.contains(r#""data":"#));
|
||||
}
|
||||
}
|
||||
54
crates/adapters/nats/src/publisher.rs
Normal file
54
crates/adapters/nats/src/publisher.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
use async_nats::{jetstream, Client};
|
||||
use async_trait::async_trait;
|
||||
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
|
||||
|
||||
use crate::{payload::NatsEventPayload, subject::event_to_subject};
|
||||
|
||||
enum PublisherMode {
|
||||
Core(Client),
|
||||
JetStream(jetstream::Context),
|
||||
}
|
||||
|
||||
pub struct NatsEventPublisher {
|
||||
mode: PublisherMode,
|
||||
subject_prefix: String,
|
||||
}
|
||||
|
||||
impl NatsEventPublisher {
|
||||
pub fn new_core(client: Client, subject_prefix: String) -> Self {
|
||||
Self { mode: PublisherMode::Core(client), subject_prefix }
|
||||
}
|
||||
|
||||
pub fn new_jetstream(client: Client, subject_prefix: String) -> Self {
|
||||
Self { mode: PublisherMode::JetStream(jetstream::new(client)), subject_prefix }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventPublisher for NatsEventPublisher {
|
||||
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
let subject = event_to_subject(&self.subject_prefix, event);
|
||||
let payload = serde_json::to_vec(&NatsEventPayload::from(event))
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||
|
||||
match &self.mode {
|
||||
PublisherMode::Core(client) => client
|
||||
.publish(subject, payload.into())
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string())),
|
||||
|
||||
PublisherMode::JetStream(js) => js
|
||||
.publish(subject, payload.into())
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn _assert_send_sync() {
|
||||
fn check<T: Send + Sync>() {}
|
||||
check::<NatsEventPublisher>();
|
||||
}
|
||||
76
crates/adapters/nats/src/subject.rs
Normal file
76
crates/adapters/nats/src/subject.rs
Normal file
@@ -0,0 +1,76 @@
|
||||
use domain::events::DomainEvent;
|
||||
|
||||
pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String {
|
||||
let suffix = match event {
|
||||
DomainEvent::ReviewLogged { .. } => "review.logged",
|
||||
DomainEvent::ReviewUpdated { .. } => "review.updated",
|
||||
DomainEvent::MovieDiscovered { .. } => "movie.discovered",
|
||||
};
|
||||
format!("{prefix}.{suffix}")
|
||||
}
|
||||
|
||||
pub fn consumer_subject_filter(prefix: &str) -> String {
|
||||
format!("{prefix}.>")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use chrono::NaiveDateTime;
|
||||
use domain::value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId};
|
||||
use uuid::Uuid;
|
||||
|
||||
fn dt() -> NaiveDateTime {
|
||||
chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn review_logged_subject() {
|
||||
let event = DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(3).unwrap(),
|
||||
watched_at: dt(),
|
||||
};
|
||||
assert_eq!(
|
||||
event_to_subject("movies-diary.events", &event),
|
||||
"movies-diary.events.review.logged"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn review_updated_subject() {
|
||||
let event = DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(5).unwrap(),
|
||||
watched_at: dt(),
|
||||
};
|
||||
assert_eq!(
|
||||
event_to_subject("movies-diary.events", &event),
|
||||
"movies-diary.events.review.updated"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn movie_discovered_subject() {
|
||||
let event = DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
external_metadata_id: ExternalMetadataId::new("tt0000001".into()).unwrap(),
|
||||
};
|
||||
assert_eq!(
|
||||
event_to_subject("movies-diary.events", &event),
|
||||
"movies-diary.events.movie.discovered"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn consumer_subject_filter_appends_wildcard() {
|
||||
assert_eq!(
|
||||
consumer_subject_filter("movies-diary.events"),
|
||||
"movies-diary.events.>"
|
||||
);
|
||||
}
|
||||
}
|
||||
17
crates/adapters/postgres-event-queue/Cargo.toml
Normal file
17
crates/adapters/postgres-event-queue/Cargo.toml
Normal file
@@ -0,0 +1,17 @@
|
||||
[package]
|
||||
name = "postgres-event-queue"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "macros", "chrono"] }
|
||||
domain = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
@@ -0,0 +1,13 @@
|
||||
CREATE TABLE IF NOT EXISTS event_queue (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
event_type TEXT NOT NULL,
|
||||
payload TEXT NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
attempts INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
next_attempt_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
last_error TEXT
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_event_queue_poll
|
||||
ON event_queue (status, next_attempt_at);
|
||||
225
crates/adapters/postgres-event-queue/src/lib.rs
Normal file
225
crates/adapters/postgres-event-queue/src/lib.rs
Normal file
@@ -0,0 +1,225 @@
|
||||
mod migrations;
|
||||
mod payload;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::{AckHandle, DomainEvent, EventEnvelope},
|
||||
ports::{EventConsumer, EventPublisher},
|
||||
};
|
||||
use futures::stream::{self, BoxStream};
|
||||
use sqlx::PgPool;
|
||||
use tokio::sync::{Mutex, mpsc};
|
||||
|
||||
use payload::DbEventPayload;
|
||||
|
||||
pub struct DbEventQueueConfig {
|
||||
pub poll_interval_ms: u64,
|
||||
pub batch_size: i64,
|
||||
pub max_attempts: i32,
|
||||
}
|
||||
|
||||
impl DbEventQueueConfig {
|
||||
pub fn from_env() -> Self {
|
||||
Self {
|
||||
poll_interval_ms: std::env::var("EVENT_QUEUE_POLL_INTERVAL_MS")
|
||||
.ok().and_then(|v| v.parse().ok()).unwrap_or(500),
|
||||
batch_size: std::env::var("EVENT_QUEUE_BATCH_SIZE")
|
||||
.ok().and_then(|v| v.parse().ok()).unwrap_or(10),
|
||||
max_attempts: std::env::var("EVENT_QUEUE_MAX_ATTEMPTS")
|
||||
.ok().and_then(|v| v.parse().ok()).unwrap_or(5),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PostgresEventQueue {
|
||||
pool: PgPool,
|
||||
config: Arc<DbEventQueueConfig>,
|
||||
}
|
||||
|
||||
impl PostgresEventQueue {
|
||||
pub async fn create(pool: PgPool, config: DbEventQueueConfig) -> anyhow::Result<Self> {
|
||||
migrations::run(&pool).await?;
|
||||
Ok(Self { pool, config: Arc::new(config) })
|
||||
}
|
||||
|
||||
pub async fn create_publisher(pool: PgPool) -> anyhow::Result<Arc<dyn EventPublisher>> {
|
||||
let q = Self::create(pool, DbEventQueueConfig::from_env()).await?;
|
||||
Ok(Arc::new(q))
|
||||
}
|
||||
|
||||
pub async fn create_channel(
|
||||
pool: PgPool,
|
||||
) -> anyhow::Result<(Arc<dyn EventPublisher>, Arc<dyn EventConsumer>)> {
|
||||
let q = Self::create(pool, DbEventQueueConfig::from_env()).await?;
|
||||
Ok((Arc::new(q.clone()), Arc::new(q)))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventPublisher for PostgresEventQueue {
|
||||
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
let db_payload = DbEventPayload::from(event);
|
||||
let event_type = db_payload.event_type();
|
||||
let payload_json = serde_json::to_string(&db_payload)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("serialize: {e}")))?;
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO event_queue (event_type, payload) VALUES ($1, $2)"
|
||||
)
|
||||
.bind(event_type)
|
||||
.bind(payload_json)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("insert event: {e}")))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl EventConsumer for PostgresEventQueue {
|
||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||
let pool = self.pool.clone();
|
||||
let config = Arc::clone(&self.config);
|
||||
let (tx, rx) = mpsc::channel(128);
|
||||
let rx = Arc::new(Mutex::new(rx));
|
||||
|
||||
tokio::spawn(async move {
|
||||
let poll_interval = Duration::from_millis(config.poll_interval_ms);
|
||||
loop {
|
||||
match claim_batch(&pool, &config).await {
|
||||
Err(e) => {
|
||||
tracing::error!("postgres event queue claim error: {e}");
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
}
|
||||
Ok(rows) if rows.is_empty() => {
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
}
|
||||
Ok(rows) => {
|
||||
for row in rows {
|
||||
let envelope = decode_row(&pool, row, config.max_attempts);
|
||||
if tx.send(envelope).await.is_err() {
|
||||
tracing::info!("postgres event queue consumer closed");
|
||||
return;
|
||||
}
|
||||
}
|
||||
// no sleep — re-poll immediately when batch was non-empty
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Box::pin(stream::unfold(rx, |rx| async move {
|
||||
let item = rx.lock().await.recv().await?;
|
||||
Some((item, rx))
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
// ── Internal types ────────────────────────────────────────────────────────────
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct QueueRow {
|
||||
id: i64,
|
||||
payload: String,
|
||||
attempts: i32,
|
||||
}
|
||||
|
||||
async fn claim_batch(
|
||||
pool: &PgPool,
|
||||
config: &DbEventQueueConfig,
|
||||
) -> Result<Vec<QueueRow>, DomainError> {
|
||||
// CTE with FOR UPDATE SKIP LOCKED — atomic and safe for multiple workers
|
||||
let rows = sqlx::query_as::<_, QueueRow>(
|
||||
r#"
|
||||
WITH claimed AS (
|
||||
SELECT id FROM event_queue
|
||||
WHERE status = 'pending' AND next_attempt_at <= NOW()
|
||||
ORDER BY next_attempt_at ASC
|
||||
LIMIT $1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
UPDATE event_queue q
|
||||
SET status = 'processing'
|
||||
FROM claimed
|
||||
WHERE q.id = claimed.id
|
||||
RETURNING q.id, q.payload, q.attempts
|
||||
"#
|
||||
)
|
||||
.bind(config.batch_size)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("claim batch: {e}")))?;
|
||||
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
fn decode_row(
|
||||
pool: &PgPool,
|
||||
row: QueueRow,
|
||||
max_attempts: i32,
|
||||
) -> Result<EventEnvelope, DomainError> {
|
||||
let db_payload: DbEventPayload = serde_json::from_str(&row.payload)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?;
|
||||
let event = DomainEvent::try_from(db_payload)?;
|
||||
Ok(EventEnvelope::new(event, Box::new(DbAckHandle {
|
||||
pool: pool.clone(),
|
||||
row_id: row.id,
|
||||
attempts: row.attempts,
|
||||
max_attempts,
|
||||
})))
|
||||
}
|
||||
|
||||
struct DbAckHandle {
|
||||
pool: PgPool,
|
||||
row_id: i64,
|
||||
attempts: i32,
|
||||
max_attempts: i32,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AckHandle for DbAckHandle {
|
||||
async fn ack(&self) -> Result<(), DomainError> {
|
||||
sqlx::query("UPDATE event_queue SET status = 'done' WHERE id = $1")
|
||||
.bind(self.row_id)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("ack: {e}")))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn nack(&self) -> Result<(), DomainError> {
|
||||
let new_attempts = self.attempts + 1;
|
||||
if new_attempts >= self.max_attempts {
|
||||
sqlx::query(
|
||||
"UPDATE event_queue SET status = 'dead_lettered', attempts = $1, last_error = 'max attempts reached' WHERE id = $2"
|
||||
)
|
||||
.bind(new_attempts)
|
||||
.bind(self.row_id)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("nack dead-letter: {e}")))?;
|
||||
} else {
|
||||
let backoff = backoff_seconds(new_attempts).to_string();
|
||||
sqlx::query(
|
||||
"UPDATE event_queue SET status = 'pending', attempts = $1, next_attempt_at = NOW() + ($2 || ' seconds')::interval, last_error = 'nack' WHERE id = $3"
|
||||
)
|
||||
.bind(new_attempts)
|
||||
.bind(backoff)
|
||||
.bind(self.row_id)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("nack retry: {e}")))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn backoff_seconds(attempts: i32) -> i64 {
|
||||
let base: i64 = 5 * (1i64 << attempts.min(6));
|
||||
base.min(300)
|
||||
}
|
||||
7
crates/adapters/postgres-event-queue/src/migrations.rs
Normal file
7
crates/adapters/postgres-event-queue/src/migrations.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
pub(crate) async fn run(pool: &sqlx::PgPool) -> anyhow::Result<()> {
|
||||
sqlx::migrate!("./migrations")
|
||||
.set_ignore_missing(true)
|
||||
.run(pool)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("postgres-event-queue migration failed: {e}"))
|
||||
}
|
||||
189
crates/adapters/postgres-event-queue/src/payload.rs
Normal file
189
crates/adapters/postgres-event-queue/src/payload.rs
Normal file
@@ -0,0 +1,189 @@
|
||||
use chrono::NaiveDateTime;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(tag = "type", content = "data")]
|
||||
pub enum DbEventPayload {
|
||||
ReviewLogged {
|
||||
review_id: String,
|
||||
movie_id: String,
|
||||
user_id: String,
|
||||
rating: u8,
|
||||
watched_at: i64,
|
||||
},
|
||||
ReviewUpdated {
|
||||
review_id: String,
|
||||
movie_id: String,
|
||||
user_id: String,
|
||||
rating: u8,
|
||||
watched_at: i64,
|
||||
},
|
||||
MovieDiscovered {
|
||||
movie_id: String,
|
||||
external_metadata_id: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl DbEventPayload {
|
||||
pub fn event_type(&self) -> &'static str {
|
||||
match self {
|
||||
DbEventPayload::ReviewLogged { .. } => "ReviewLogged",
|
||||
DbEventPayload::ReviewUpdated { .. } => "ReviewUpdated",
|
||||
DbEventPayload::MovieDiscovered { .. } => "MovieDiscovered",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_uuid(s: &str, field: &str) -> Result<Uuid, DomainError> {
|
||||
Uuid::parse_str(s)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}")))
|
||||
}
|
||||
|
||||
fn parse_ts(ts: i64) -> Result<NaiveDateTime, DomainError> {
|
||||
chrono::DateTime::from_timestamp(ts, 0)
|
||||
.map(|dt| dt.naive_utc())
|
||||
.ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}")))
|
||||
}
|
||||
|
||||
impl From<&DomainEvent> for DbEventPayload {
|
||||
fn from(event: &DomainEvent) -> Self {
|
||||
match event {
|
||||
DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
DbEventPayload::ReviewLogged {
|
||||
review_id: review_id.value().to_string(),
|
||||
movie_id: movie_id.value().to_string(),
|
||||
user_id: user_id.value().to_string(),
|
||||
rating: rating.value(),
|
||||
watched_at: watched_at.and_utc().timestamp(),
|
||||
}
|
||||
}
|
||||
DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
DbEventPayload::ReviewUpdated {
|
||||
review_id: review_id.value().to_string(),
|
||||
movie_id: movie_id.value().to_string(),
|
||||
user_id: user_id.value().to_string(),
|
||||
rating: rating.value(),
|
||||
watched_at: watched_at.and_utc().timestamp(),
|
||||
}
|
||||
}
|
||||
DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||
DbEventPayload::MovieDiscovered {
|
||||
movie_id: movie_id.value().to_string(),
|
||||
external_metadata_id: external_metadata_id.value().to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<DbEventPayload> for DomainEvent {
|
||||
type Error = DomainError;
|
||||
fn try_from(payload: DbEventPayload) -> Result<Self, DomainError> {
|
||||
match payload {
|
||||
DbEventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
Ok(DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
rating: Rating::new(rating)?,
|
||||
watched_at: parse_ts(watched_at)?,
|
||||
})
|
||||
}
|
||||
DbEventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
Ok(DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
rating: Rating::new(rating)?,
|
||||
watched_at: parse_ts(watched_at)?,
|
||||
})
|
||||
}
|
||||
DbEventPayload::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||
Ok(DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
external_metadata_id: ExternalMetadataId::new(external_metadata_id)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn fixed_dt() -> NaiveDateTime {
|
||||
chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc()
|
||||
}
|
||||
|
||||
fn review_logged() -> DomainEvent {
|
||||
DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(4).unwrap(),
|
||||
watched_at: fixed_dt(),
|
||||
}
|
||||
}
|
||||
|
||||
fn review_updated() -> DomainEvent {
|
||||
DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(3).unwrap(),
|
||||
watched_at: fixed_dt(),
|
||||
}
|
||||
}
|
||||
|
||||
fn movie_discovered() -> DomainEvent {
|
||||
DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
fn round_trip(event: DomainEvent) {
|
||||
let payload = DbEventPayload::from(&event);
|
||||
let json = serde_json::to_string(&payload).expect("serialize");
|
||||
let back: DbEventPayload = serde_json::from_str(&json).expect("deserialize");
|
||||
let recovered = DomainEvent::try_from(back).expect("try_from");
|
||||
assert_eq!(DbEventPayload::from(&event), DbEventPayload::from(&recovered));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_review_logged() {
|
||||
round_trip(review_logged());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_review_updated() {
|
||||
round_trip(review_updated());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_movie_discovered() {
|
||||
round_trip(movie_discovered());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialized_format_is_tagged() {
|
||||
let payload = DbEventPayload::from(&movie_discovered());
|
||||
let json = serde_json::to_string(&payload).unwrap();
|
||||
assert!(json.contains(r#""type":"MovieDiscovered""#));
|
||||
assert!(json.contains(r#""data":"#));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_type_strings() {
|
||||
assert_eq!(DbEventPayload::from(&review_logged()).event_type(), "ReviewLogged");
|
||||
assert_eq!(DbEventPayload::from(&review_updated()).event_type(), "ReviewUpdated");
|
||||
assert_eq!(DbEventPayload::from(&movie_discovered()).event_type(), "MovieDiscovered");
|
||||
}
|
||||
}
|
||||
@@ -57,6 +57,7 @@ impl PostgresRepository {
|
||||
|
||||
pub async fn migrate(&self) -> Result<(), DomainError> {
|
||||
sqlx::migrate!("./migrations")
|
||||
.set_ignore_missing(true)
|
||||
.run(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("Migration failed: {}", e)))
|
||||
|
||||
17
crates/adapters/sqlite-event-queue/Cargo.toml
Normal file
17
crates/adapters/sqlite-event-queue/Cargo.toml
Normal file
@@ -0,0 +1,17 @@
|
||||
[package]
|
||||
name = "sqlite-event-queue"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "sqlite", "macros", "chrono"] }
|
||||
domain = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
@@ -0,0 +1,13 @@
|
||||
CREATE TABLE IF NOT EXISTS event_queue (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
event_type TEXT NOT NULL,
|
||||
payload TEXT NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
attempts INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
|
||||
next_attempt_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
|
||||
last_error TEXT
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_event_queue_poll
|
||||
ON event_queue (status, next_attempt_at);
|
||||
236
crates/adapters/sqlite-event-queue/src/lib.rs
Normal file
236
crates/adapters/sqlite-event-queue/src/lib.rs
Normal file
@@ -0,0 +1,236 @@
|
||||
mod migrations;
|
||||
mod payload;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::{AckHandle, DomainEvent, EventEnvelope},
|
||||
ports::{EventConsumer, EventPublisher},
|
||||
};
|
||||
use futures::stream::{self, BoxStream};
|
||||
use sqlx::SqlitePool;
|
||||
use tokio::sync::{Mutex, mpsc};
|
||||
|
||||
use payload::DbEventPayload;
|
||||
|
||||
pub struct DbEventQueueConfig {
|
||||
pub poll_interval_ms: u64,
|
||||
pub batch_size: i64,
|
||||
pub max_attempts: i32,
|
||||
}
|
||||
|
||||
impl DbEventQueueConfig {
|
||||
pub fn from_env() -> Self {
|
||||
Self {
|
||||
poll_interval_ms: std::env::var("EVENT_QUEUE_POLL_INTERVAL_MS")
|
||||
.ok().and_then(|v| v.parse().ok()).unwrap_or(500),
|
||||
batch_size: std::env::var("EVENT_QUEUE_BATCH_SIZE")
|
||||
.ok().and_then(|v| v.parse().ok()).unwrap_or(10),
|
||||
max_attempts: std::env::var("EVENT_QUEUE_MAX_ATTEMPTS")
|
||||
.ok().and_then(|v| v.parse().ok()).unwrap_or(5),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SqliteEventQueue {
|
||||
pool: SqlitePool,
|
||||
config: Arc<DbEventQueueConfig>,
|
||||
}
|
||||
|
||||
impl SqliteEventQueue {
|
||||
pub async fn create(pool: SqlitePool, config: DbEventQueueConfig) -> anyhow::Result<Self> {
|
||||
migrations::run(&pool).await?;
|
||||
Ok(Self { pool, config: Arc::new(config) })
|
||||
}
|
||||
|
||||
pub async fn create_publisher(pool: SqlitePool) -> anyhow::Result<Arc<dyn EventPublisher>> {
|
||||
let q = Self::create(pool, DbEventQueueConfig::from_env()).await?;
|
||||
Ok(Arc::new(q))
|
||||
}
|
||||
|
||||
pub async fn create_channel(
|
||||
pool: SqlitePool,
|
||||
) -> anyhow::Result<(Arc<dyn EventPublisher>, Arc<dyn EventConsumer>)> {
|
||||
let q = Self::create(pool, DbEventQueueConfig::from_env()).await?;
|
||||
Ok((Arc::new(q.clone()), Arc::new(q)))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventPublisher for SqliteEventQueue {
|
||||
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
let db_payload = DbEventPayload::from(event);
|
||||
let event_type = db_payload.event_type();
|
||||
let payload_json = serde_json::to_string(&db_payload)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("serialize: {e}")))?;
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO event_queue (event_type, payload) VALUES (?, ?)"
|
||||
)
|
||||
.bind(event_type)
|
||||
.bind(payload_json)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("insert event: {e}")))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl EventConsumer for SqliteEventQueue {
|
||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||
let pool = self.pool.clone();
|
||||
let config = Arc::clone(&self.config);
|
||||
let (tx, rx) = mpsc::channel(128);
|
||||
let rx = Arc::new(Mutex::new(rx));
|
||||
|
||||
tokio::spawn(async move {
|
||||
let poll_interval = Duration::from_millis(config.poll_interval_ms);
|
||||
loop {
|
||||
match claim_batch(&pool, &config).await {
|
||||
Err(e) => {
|
||||
tracing::error!("sqlite event queue claim error: {e}");
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
}
|
||||
Ok(rows) if rows.is_empty() => {
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
}
|
||||
Ok(rows) => {
|
||||
for row in rows {
|
||||
let envelope = decode_row(&pool, row, config.max_attempts);
|
||||
if tx.send(envelope).await.is_err() {
|
||||
tracing::info!("sqlite event queue consumer closed");
|
||||
return;
|
||||
}
|
||||
}
|
||||
// no sleep — re-poll immediately when batch was non-empty
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Box::pin(stream::unfold(rx, |rx| async move {
|
||||
let item = rx.lock().await.recv().await?;
|
||||
Some((item, rx))
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
// ── Internal types ────────────────────────────────────────────────────────────
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct QueueRow {
|
||||
id: i64,
|
||||
payload: String,
|
||||
attempts: i32,
|
||||
}
|
||||
|
||||
async fn claim_batch(
|
||||
pool: &SqlitePool,
|
||||
config: &DbEventQueueConfig,
|
||||
) -> Result<Vec<QueueRow>, DomainError> {
|
||||
let mut tx = pool.begin().await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("begin tx: {e}")))?;
|
||||
|
||||
let rows = sqlx::query_as::<_, QueueRow>(
|
||||
"SELECT id, payload, attempts FROM event_queue
|
||||
WHERE status = 'pending'
|
||||
AND next_attempt_at <= strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
|
||||
ORDER BY next_attempt_at ASC
|
||||
LIMIT ?"
|
||||
)
|
||||
.bind(config.batch_size)
|
||||
.fetch_all(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("select pending: {e}")))?;
|
||||
|
||||
if rows.is_empty() {
|
||||
tx.rollback().await.ok();
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let placeholders = rows.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
|
||||
let sql = format!(
|
||||
"UPDATE event_queue SET status = 'processing' WHERE id IN ({})",
|
||||
placeholders
|
||||
);
|
||||
let mut q = sqlx::query(&sql);
|
||||
for r in &rows { q = q.bind(r.id); }
|
||||
q.execute(&mut *tx).await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("mark processing: {e}")))?;
|
||||
|
||||
tx.commit().await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("commit claim: {e}")))?;
|
||||
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
fn decode_row(
|
||||
pool: &SqlitePool,
|
||||
row: QueueRow,
|
||||
max_attempts: i32,
|
||||
) -> Result<EventEnvelope, DomainError> {
|
||||
let db_payload: DbEventPayload = serde_json::from_str(&row.payload)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?;
|
||||
let event = DomainEvent::try_from(db_payload)?;
|
||||
Ok(EventEnvelope::new(event, Box::new(DbAckHandle {
|
||||
pool: pool.clone(),
|
||||
row_id: row.id,
|
||||
attempts: row.attempts,
|
||||
max_attempts,
|
||||
})))
|
||||
}
|
||||
|
||||
struct DbAckHandle {
|
||||
pool: SqlitePool,
|
||||
row_id: i64,
|
||||
attempts: i32,
|
||||
max_attempts: i32,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AckHandle for DbAckHandle {
|
||||
async fn ack(&self) -> Result<(), DomainError> {
|
||||
sqlx::query("UPDATE event_queue SET status = 'done' WHERE id = ?")
|
||||
.bind(self.row_id)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("ack: {e}")))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn nack(&self) -> Result<(), DomainError> {
|
||||
let new_attempts = self.attempts + 1;
|
||||
if new_attempts >= self.max_attempts {
|
||||
sqlx::query(
|
||||
"UPDATE event_queue SET status = 'dead_lettered', attempts = ?, last_error = 'max attempts reached' WHERE id = ?"
|
||||
)
|
||||
.bind(new_attempts)
|
||||
.bind(self.row_id)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("nack dead-letter: {e}")))?;
|
||||
} else {
|
||||
let backoff = backoff_seconds(new_attempts);
|
||||
let sql = format!(
|
||||
"UPDATE event_queue SET status = 'pending', attempts = ?, next_attempt_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now', '+{backoff} seconds'), last_error = 'nack' WHERE id = ?"
|
||||
);
|
||||
sqlx::query(&sql)
|
||||
.bind(new_attempts)
|
||||
.bind(self.row_id)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("nack retry: {e}")))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn backoff_seconds(attempts: i32) -> i64 {
|
||||
let base: i64 = 5 * (1i64 << attempts.min(6));
|
||||
base.min(300)
|
||||
}
|
||||
7
crates/adapters/sqlite-event-queue/src/migrations.rs
Normal file
7
crates/adapters/sqlite-event-queue/src/migrations.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
pub(crate) async fn run(pool: &sqlx::SqlitePool) -> anyhow::Result<()> {
|
||||
sqlx::migrate!("./migrations")
|
||||
.set_ignore_missing(true)
|
||||
.run(pool)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("sqlite-event-queue migration failed: {e}"))
|
||||
}
|
||||
189
crates/adapters/sqlite-event-queue/src/payload.rs
Normal file
189
crates/adapters/sqlite-event-queue/src/payload.rs
Normal file
@@ -0,0 +1,189 @@
|
||||
use chrono::NaiveDateTime;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(tag = "type", content = "data")]
|
||||
pub enum DbEventPayload {
|
||||
ReviewLogged {
|
||||
review_id: String,
|
||||
movie_id: String,
|
||||
user_id: String,
|
||||
rating: u8,
|
||||
watched_at: i64,
|
||||
},
|
||||
ReviewUpdated {
|
||||
review_id: String,
|
||||
movie_id: String,
|
||||
user_id: String,
|
||||
rating: u8,
|
||||
watched_at: i64,
|
||||
},
|
||||
MovieDiscovered {
|
||||
movie_id: String,
|
||||
external_metadata_id: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl DbEventPayload {
|
||||
pub fn event_type(&self) -> &'static str {
|
||||
match self {
|
||||
DbEventPayload::ReviewLogged { .. } => "ReviewLogged",
|
||||
DbEventPayload::ReviewUpdated { .. } => "ReviewUpdated",
|
||||
DbEventPayload::MovieDiscovered { .. } => "MovieDiscovered",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_uuid(s: &str, field: &str) -> Result<Uuid, DomainError> {
|
||||
Uuid::parse_str(s)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}")))
|
||||
}
|
||||
|
||||
fn parse_ts(ts: i64) -> Result<NaiveDateTime, DomainError> {
|
||||
chrono::DateTime::from_timestamp(ts, 0)
|
||||
.map(|dt| dt.naive_utc())
|
||||
.ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}")))
|
||||
}
|
||||
|
||||
impl From<&DomainEvent> for DbEventPayload {
|
||||
fn from(event: &DomainEvent) -> Self {
|
||||
match event {
|
||||
DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
DbEventPayload::ReviewLogged {
|
||||
review_id: review_id.value().to_string(),
|
||||
movie_id: movie_id.value().to_string(),
|
||||
user_id: user_id.value().to_string(),
|
||||
rating: rating.value(),
|
||||
watched_at: watched_at.and_utc().timestamp(),
|
||||
}
|
||||
}
|
||||
DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
DbEventPayload::ReviewUpdated {
|
||||
review_id: review_id.value().to_string(),
|
||||
movie_id: movie_id.value().to_string(),
|
||||
user_id: user_id.value().to_string(),
|
||||
rating: rating.value(),
|
||||
watched_at: watched_at.and_utc().timestamp(),
|
||||
}
|
||||
}
|
||||
DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||
DbEventPayload::MovieDiscovered {
|
||||
movie_id: movie_id.value().to_string(),
|
||||
external_metadata_id: external_metadata_id.value().to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<DbEventPayload> for DomainEvent {
|
||||
type Error = DomainError;
|
||||
fn try_from(payload: DbEventPayload) -> Result<Self, DomainError> {
|
||||
match payload {
|
||||
DbEventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
Ok(DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
rating: Rating::new(rating)?,
|
||||
watched_at: parse_ts(watched_at)?,
|
||||
})
|
||||
}
|
||||
DbEventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
Ok(DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
rating: Rating::new(rating)?,
|
||||
watched_at: parse_ts(watched_at)?,
|
||||
})
|
||||
}
|
||||
DbEventPayload::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||
Ok(DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
external_metadata_id: ExternalMetadataId::new(external_metadata_id)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn fixed_dt() -> NaiveDateTime {
|
||||
chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc()
|
||||
}
|
||||
|
||||
fn review_logged() -> DomainEvent {
|
||||
DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(4).unwrap(),
|
||||
watched_at: fixed_dt(),
|
||||
}
|
||||
}
|
||||
|
||||
fn review_updated() -> DomainEvent {
|
||||
DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(3).unwrap(),
|
||||
watched_at: fixed_dt(),
|
||||
}
|
||||
}
|
||||
|
||||
fn movie_discovered() -> DomainEvent {
|
||||
DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
fn round_trip(event: DomainEvent) {
|
||||
let payload = DbEventPayload::from(&event);
|
||||
let json = serde_json::to_string(&payload).expect("serialize");
|
||||
let back: DbEventPayload = serde_json::from_str(&json).expect("deserialize");
|
||||
let recovered = DomainEvent::try_from(back).expect("try_from");
|
||||
assert_eq!(DbEventPayload::from(&event), DbEventPayload::from(&recovered));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_review_logged() {
|
||||
round_trip(review_logged());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_review_updated() {
|
||||
round_trip(review_updated());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_movie_discovered() {
|
||||
round_trip(movie_discovered());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialized_format_is_tagged() {
|
||||
let payload = DbEventPayload::from(&movie_discovered());
|
||||
let json = serde_json::to_string(&payload).unwrap();
|
||||
assert!(json.contains(r#""type":"MovieDiscovered""#));
|
||||
assert!(json.contains(r#""data":"#));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_type_strings() {
|
||||
assert_eq!(DbEventPayload::from(&review_logged()).event_type(), "ReviewLogged");
|
||||
assert_eq!(DbEventPayload::from(&review_updated()).event_type(), "ReviewUpdated");
|
||||
assert_eq!(DbEventPayload::from(&movie_discovered()).event_type(), "MovieDiscovered");
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,7 @@ use sqlx::SqlitePool;
|
||||
|
||||
pub(crate) async fn run(pool: &SqlitePool) -> Result<(), DomainError> {
|
||||
sqlx::migrate!("./migrations")
|
||||
.set_ignore_missing(true)
|
||||
.run(pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
||||
|
||||
@@ -20,7 +20,10 @@ impl WorkerService {
|
||||
let mut stream = self.consumer.consume();
|
||||
while let Some(result) = stream.next().await {
|
||||
match result {
|
||||
Ok(envelope) => self.dispatch(envelope).await,
|
||||
Ok(envelope) => {
|
||||
tracing::info!(event = ?envelope.event, "received event");
|
||||
self.dispatch(envelope).await;
|
||||
}
|
||||
Err(e) => tracing::error!("event consumer error: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,12 +5,23 @@ edition = "2024"
|
||||
|
||||
[features]
|
||||
default = ["sqlite", "sqlite-federation"]
|
||||
sqlite = ["dep:sqlite"]
|
||||
postgres = ["dep:postgres"]
|
||||
sqlite = ["dep:sqlite", "dep:sqlite-event-queue"]
|
||||
postgres = ["dep:postgres", "dep:postgres-event-queue"]
|
||||
nats = ["dep:nats"]
|
||||
# Meta-feature: true when any federation adapter is active — keeps all #[cfg(feature = "federation")] gates working
|
||||
federation = []
|
||||
sqlite-federation = ["sqlite", "dep:sqlite-federation", "dep:activitypub", "federation"]
|
||||
postgres-federation = ["postgres", "dep:postgres-federation", "dep:activitypub", "federation"]
|
||||
sqlite-federation = [
|
||||
"sqlite",
|
||||
"dep:sqlite-federation",
|
||||
"dep:activitypub",
|
||||
"federation",
|
||||
]
|
||||
postgres-federation = [
|
||||
"postgres",
|
||||
"dep:postgres-federation",
|
||||
"dep:activitypub",
|
||||
"federation",
|
||||
]
|
||||
|
||||
[dependencies]
|
||||
tower-http = { version = "0.6.8", features = ["fs", "trace", "tracing"] }
|
||||
@@ -39,6 +50,7 @@ poster-fetcher = { workspace = true }
|
||||
poster-storage = { workspace = true }
|
||||
template-askama = { workspace = true }
|
||||
event-publisher = { workspace = true }
|
||||
nats = { workspace = true, optional = true }
|
||||
rss = { workspace = true }
|
||||
export = { workspace = true }
|
||||
doc = { workspace = true }
|
||||
@@ -46,12 +58,14 @@ sqlx = { workspace = true }
|
||||
utoipa = { version = "5.5.0", features = ["axum_extras", "uuid"] }
|
||||
|
||||
# Optional — database backends
|
||||
sqlite = { workspace = true, optional = true }
|
||||
postgres = { workspace = true, optional = true }
|
||||
sqlite = { workspace = true, optional = true }
|
||||
postgres = { workspace = true, optional = true }
|
||||
sqlite-event-queue = { workspace = true, optional = true }
|
||||
postgres-event-queue = { workspace = true, optional = true }
|
||||
|
||||
# Optional — federation
|
||||
activitypub = { workspace = true, optional = true }
|
||||
sqlite-federation = { workspace = true, optional = true }
|
||||
activitypub = { workspace = true, optional = true }
|
||||
sqlite-federation = { workspace = true, optional = true }
|
||||
postgres-federation = { workspace = true, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use event_publisher::{EventPublisherConfig, NoopEventPublisher, create_event_channel};
|
||||
use application::event_handlers::PosterSyncHandler;
|
||||
use std::str::FromStr;
|
||||
|
||||
use tokio::net::TcpListener;
|
||||
@@ -20,11 +18,11 @@ use postgres_federation::PostgresFederationRepository;
|
||||
|
||||
#[cfg(feature = "federation")]
|
||||
use activitypub::{
|
||||
ActivityPubEventHandler, ActivityPubPort, ActivityPubService, DomainUserRepoAdapter,
|
||||
ActivityPubPort, ActivityPubService, DomainUserRepoAdapter,
|
||||
ReviewObjectHandler,
|
||||
};
|
||||
|
||||
use application::{config::AppConfig, context::AppContext, worker::WorkerService};
|
||||
use application::{config::AppConfig, context::AppContext};
|
||||
use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService};
|
||||
use export::ExportAdapter;
|
||||
use metadata::MetadataClientImpl;
|
||||
@@ -38,9 +36,9 @@ use presentation::{openapi::ApiDoc, routes, state::AppState};
|
||||
use utoipa::OpenApi as _;
|
||||
|
||||
use domain::ports::{
|
||||
AuthService, DiaryExporter, DiaryRepository, MetadataClient, MovieRepository,
|
||||
PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository,
|
||||
UserRepository,
|
||||
AuthService, DiaryExporter, DiaryRepository, EventPublisher, MetadataClient,
|
||||
MovieRepository, PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository,
|
||||
StatsRepository, UserRepository,
|
||||
};
|
||||
|
||||
#[cfg(not(any(feature = "sqlite", feature = "postgres")))]
|
||||
@@ -89,69 +87,40 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
||||
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
||||
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
||||
|
||||
// Only track pools when the federation feature for that backend needs them
|
||||
#[cfg(feature = "sqlite-federation")]
|
||||
let mut sqlite_pool: Option<sqlx::SqlitePool> = None;
|
||||
#[cfg(feature = "postgres-federation")]
|
||||
let mut pg_pool: Option<sqlx::PgPool> = None;
|
||||
|
||||
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository):
|
||||
(Arc<dyn MovieRepository>, Arc<dyn ReviewRepository>, Arc<dyn DiaryRepository>,
|
||||
Arc<dyn StatsRepository>, Arc<dyn UserRepository>) =
|
||||
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, db_pool) =
|
||||
match backend.as_str() {
|
||||
#[cfg(feature = "postgres")]
|
||||
"postgres" => {
|
||||
let (_pool, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
||||
#[cfg(feature = "postgres-federation")]
|
||||
{ pg_pool = Some(_pool); }
|
||||
(m, r, d, s, u)
|
||||
let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
||||
(m, r, d, s, u, DbPool::Postgres(pool))
|
||||
}
|
||||
#[cfg(feature = "sqlite")]
|
||||
_ => {
|
||||
let (_pool, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
||||
#[cfg(feature = "sqlite-federation")]
|
||||
{ sqlite_pool = Some(_pool); }
|
||||
(m, r, d, s, u)
|
||||
let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
||||
(m, r, d, s, u, DbPool::Sqlite(pool))
|
||||
}
|
||||
#[cfg(not(feature = "sqlite"))]
|
||||
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build (sqlite feature is not enabled)"),
|
||||
};
|
||||
|
||||
// Build handler context (used for poster sync handler)
|
||||
let handler_ctx = AppContext {
|
||||
movie_repository: Arc::clone(&movie_repository),
|
||||
review_repository: Arc::clone(&review_repository),
|
||||
diary_repository: Arc::clone(&diary_repository),
|
||||
diary_exporter: Arc::new(ExportAdapter) as Arc<dyn DiaryExporter>,
|
||||
stats_repository: Arc::clone(&stats_repository),
|
||||
metadata_client: Arc::clone(&metadata_client),
|
||||
poster_fetcher: Arc::clone(&poster_fetcher),
|
||||
poster_storage: Arc::clone(&poster_storage),
|
||||
event_publisher: Arc::new(NoopEventPublisher),
|
||||
auth_service: Arc::clone(&auth_service),
|
||||
password_hasher: Arc::clone(&password_hasher),
|
||||
user_repository: Arc::clone(&user_repository),
|
||||
config: app_config.clone(),
|
||||
};
|
||||
|
||||
// Wire up event channel, federation service, and ap_router
|
||||
let event_bus = EventBusBackend::from_env()?;
|
||||
|
||||
#[cfg(feature = "federation")]
|
||||
let (event_publisher_arc, ap_router, ap_service, social_query) = {
|
||||
let (federation_repo, social_query_arc, review_store): (
|
||||
Arc<dyn activitypub::FederationRepository>,
|
||||
Arc<dyn domain::ports::SocialQueryPort>,
|
||||
Arc<dyn activitypub::RemoteReviewRepository>,
|
||||
) = match backend.as_str() {
|
||||
) = match &db_pool {
|
||||
#[cfg(feature = "postgres-federation")]
|
||||
"postgres" => {
|
||||
let pool = pg_pool.as_ref().unwrap().clone();
|
||||
let fed = Arc::new(PostgresFederationRepository::new(pool));
|
||||
DbPool::Postgres(pool) => {
|
||||
let fed = Arc::new(PostgresFederationRepository::new(pool.clone()));
|
||||
(Arc::clone(&fed) as _, Arc::clone(&fed) as _, fed as _)
|
||||
}
|
||||
#[cfg(feature = "sqlite-federation")]
|
||||
_ => {
|
||||
let pool = sqlite_pool.as_ref().unwrap().clone();
|
||||
let fed = Arc::new(SqliteFederationRepository::new(pool));
|
||||
DbPool::Sqlite(pool) => {
|
||||
let fed = Arc::new(SqliteFederationRepository::new(pool.clone()));
|
||||
(Arc::clone(&fed) as _, Arc::clone(&fed) as _, fed as _)
|
||||
}
|
||||
#[cfg(not(feature = "sqlite-federation"))]
|
||||
@@ -176,34 +145,60 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
||||
.await?,
|
||||
);
|
||||
let ap_router = concrete_ap_service.router();
|
||||
let ap_event_handler = ActivityPubEventHandler::new(
|
||||
Arc::clone(&concrete_ap_service),
|
||||
Arc::clone(&movie_repository),
|
||||
Arc::clone(&review_repository),
|
||||
app_config.base_url.clone(),
|
||||
);
|
||||
let ap_service_arc: Arc<dyn ActivityPubPort> = concrete_ap_service;
|
||||
|
||||
let poster_handler = Arc::new(PosterSyncHandler::new(handler_ctx, 3));
|
||||
let (event_publisher, consumer) = create_event_channel(EventPublisherConfig::from_env());
|
||||
let worker = WorkerService::new(
|
||||
Arc::new(consumer),
|
||||
vec![poster_handler, Arc::new(ap_event_handler)],
|
||||
);
|
||||
tokio::spawn(worker.run());
|
||||
|
||||
let ep: Arc<dyn domain::ports::EventPublisher> = Arc::new(event_publisher);
|
||||
let ep: Arc<dyn EventPublisher> = match event_bus {
|
||||
EventBusBackend::Db => {
|
||||
tracing::info!("event bus: DB queue");
|
||||
match &db_pool {
|
||||
#[cfg(feature = "postgres")]
|
||||
DbPool::Postgres(pool) => postgres_event_queue::PostgresEventQueue::create_publisher(
|
||||
pool.clone()
|
||||
).await?,
|
||||
#[cfg(feature = "sqlite")]
|
||||
DbPool::Sqlite(pool) => sqlite_event_queue::SqliteEventQueue::create_publisher(
|
||||
pool.clone()
|
||||
).await?,
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "nats")]
|
||||
EventBusBackend::Nats => {
|
||||
let cfg = nats::NatsConfig::from_env()
|
||||
.context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?;
|
||||
tracing::info!("event bus: NATS ({})", cfg.url);
|
||||
nats::create_publisher(cfg).await?
|
||||
}
|
||||
};
|
||||
(ep, ap_router, ap_service_arc, social_query_arc)
|
||||
};
|
||||
|
||||
#[cfg(not(feature = "federation"))]
|
||||
let (event_publisher_arc, ap_router): (Arc<dyn domain::ports::EventPublisher>, axum::Router) = {
|
||||
let poster_handler = Arc::new(PosterSyncHandler::new(handler_ctx, 3));
|
||||
let (event_publisher, consumer) = create_event_channel(EventPublisherConfig::from_env());
|
||||
let worker = WorkerService::new(Arc::new(consumer), vec![poster_handler]);
|
||||
tokio::spawn(worker.run());
|
||||
(Arc::new(event_publisher), axum::Router::new())
|
||||
let event_publisher_arc: Arc<dyn EventPublisher> = match event_bus {
|
||||
EventBusBackend::Db => {
|
||||
tracing::info!("event bus: DB queue");
|
||||
match backend.as_str() {
|
||||
#[cfg(feature = "postgres")]
|
||||
"postgres" => postgres_event_queue::PostgresEventQueue::create_publisher(
|
||||
pg_pool.as_ref().unwrap().clone()
|
||||
).await?,
|
||||
#[cfg(feature = "sqlite")]
|
||||
_ => sqlite_event_queue::SqliteEventQueue::create_publisher(
|
||||
sqlite_pool.as_ref().unwrap().clone()
|
||||
).await?,
|
||||
#[cfg(not(feature = "sqlite"))]
|
||||
_ => anyhow::bail!("EVENT_BUS_BACKEND=db has no adapter for DATABASE_BACKEND={backend}; enable the sqlite or postgres feature"),
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "nats")]
|
||||
EventBusBackend::Nats => {
|
||||
let cfg = nats::NatsConfig::from_env()
|
||||
.context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?;
|
||||
tracing::info!("event bus: NATS ({})", cfg.url);
|
||||
nats::create_publisher(cfg).await?
|
||||
}
|
||||
};
|
||||
#[cfg(not(feature = "federation"))]
|
||||
let ap_router = axum::Router::new();
|
||||
|
||||
let app_ctx = AppContext {
|
||||
movie_repository,
|
||||
@@ -302,6 +297,36 @@ async fn wire_postgres(database_url: &str) -> anyhow::Result<(
|
||||
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
||||
}
|
||||
|
||||
enum DbPool {
|
||||
#[cfg(feature = "sqlite")]
|
||||
Sqlite(sqlx::SqlitePool),
|
||||
#[cfg(feature = "postgres")]
|
||||
Postgres(sqlx::PgPool),
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum EventBusBackend {
|
||||
Db,
|
||||
#[cfg(feature = "nats")]
|
||||
Nats,
|
||||
}
|
||||
|
||||
impl EventBusBackend {
|
||||
fn from_env() -> anyhow::Result<Self> {
|
||||
match std::env::var("EVENT_BUS_BACKEND")
|
||||
.unwrap_or_else(|_| "db".to_string())
|
||||
.as_str()
|
||||
{
|
||||
"db" => Ok(Self::Db),
|
||||
#[cfg(feature = "nats")]
|
||||
"nats" => Ok(Self::Nats),
|
||||
#[cfg(not(feature = "nats"))]
|
||||
"nats" => anyhow::bail!("EVENT_BUS_BACKEND=nats requires the nats feature to be compiled in"),
|
||||
other => anyhow::bail!("unknown EVENT_BUS_BACKEND={other}, expected 'db' or 'nats'"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn init_tracing() {
|
||||
tracing_subscriber::registry()
|
||||
.with(tracing_subscriber::EnvFilter::new(
|
||||
|
||||
@@ -5,8 +5,9 @@ edition = "2024"
|
||||
|
||||
[features]
|
||||
default = ["sqlite"]
|
||||
sqlite = ["dep:sqlite"]
|
||||
postgres = ["dep:postgres"]
|
||||
sqlite = ["dep:sqlite", "dep:sqlite-event-queue"]
|
||||
postgres = ["dep:postgres", "dep:postgres-event-queue"]
|
||||
nats = ["dep:nats"]
|
||||
|
||||
[dependencies]
|
||||
domain = { workspace = true }
|
||||
@@ -29,8 +30,11 @@ metadata = { workspace = true }
|
||||
poster-fetcher = { workspace = true }
|
||||
poster-storage = { workspace = true }
|
||||
export = { workspace = true }
|
||||
nats = { workspace = true, optional = true }
|
||||
sqlx = { workspace = true }
|
||||
|
||||
# Optional — database backends
|
||||
sqlite = { workspace = true, optional = true }
|
||||
sqlite = { workspace = true, optional = true }
|
||||
postgres = { workspace = true, optional = true }
|
||||
sqlite-event-queue = { workspace = true, optional = true }
|
||||
postgres-event-queue = { workspace = true, optional = true }
|
||||
|
||||
@@ -4,7 +4,6 @@ use std::str::FromStr;
|
||||
use anyhow::Context;
|
||||
use application::{config::AppConfig, context::AppContext, event_handlers::PosterSyncHandler, worker::WorkerService};
|
||||
use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService};
|
||||
use event_publisher::{EventPublisherConfig, create_event_channel};
|
||||
use export::ExportAdapter;
|
||||
use metadata::MetadataClientImpl;
|
||||
use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher};
|
||||
@@ -52,27 +51,42 @@ async fn main() -> anyhow::Result<()> {
|
||||
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
||||
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
||||
|
||||
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository):
|
||||
(Arc<dyn MovieRepository>, Arc<dyn ReviewRepository>, Arc<dyn DiaryRepository>,
|
||||
Arc<dyn StatsRepository>, Arc<dyn UserRepository>) =
|
||||
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, db_pool) =
|
||||
match backend.as_str() {
|
||||
#[cfg(feature = "postgres")]
|
||||
"postgres" => {
|
||||
let (_, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
||||
(m, r, d, s, u)
|
||||
let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
||||
(m, r, d, s, u, DbPool::Postgres(pool))
|
||||
}
|
||||
#[cfg(feature = "sqlite")]
|
||||
_ => {
|
||||
let (_, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
||||
(m, r, d, s, u)
|
||||
let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
||||
(m, r, d, s, u, DbPool::Sqlite(pool))
|
||||
}
|
||||
#[cfg(not(feature = "sqlite"))]
|
||||
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"),
|
||||
};
|
||||
|
||||
let (event_publisher_arc, consumer) = {
|
||||
let (publisher, consumer) = create_event_channel(EventPublisherConfig::from_env());
|
||||
(Arc::new(publisher) as Arc<dyn domain::ports::EventPublisher>, consumer)
|
||||
let (event_publisher_arc, consumer_arc): (
|
||||
Arc<dyn domain::ports::EventPublisher>,
|
||||
Arc<dyn domain::ports::EventConsumer>,
|
||||
) = match EventBusBackend::from_env()? {
|
||||
EventBusBackend::Db => {
|
||||
tracing::info!("event bus: DB queue");
|
||||
match db_pool {
|
||||
#[cfg(feature = "postgres")]
|
||||
DbPool::Postgres(pool) => postgres_event_queue::PostgresEventQueue::create_channel(pool).await?,
|
||||
#[cfg(feature = "sqlite")]
|
||||
DbPool::Sqlite(pool) => sqlite_event_queue::SqliteEventQueue::create_channel(pool).await?,
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "nats")]
|
||||
EventBusBackend::Nats => {
|
||||
let cfg = nats::NatsConfig::from_env()
|
||||
.context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?;
|
||||
tracing::info!("event bus: NATS ({})", cfg.url);
|
||||
nats::create_channel(cfg).await?
|
||||
}
|
||||
};
|
||||
|
||||
let ctx = AppContext {
|
||||
@@ -92,7 +106,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
let poster_handler = Arc::new(PosterSyncHandler::new(ctx, 3));
|
||||
let worker = WorkerService::new(Arc::new(consumer), vec![poster_handler]);
|
||||
let worker = WorkerService::new(consumer_arc, vec![poster_handler]);
|
||||
|
||||
tracing::info!("worker started");
|
||||
worker.run().await;
|
||||
@@ -101,10 +115,41 @@ async fn main() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
enum DbPool {
|
||||
#[cfg(feature = "sqlite")]
|
||||
Sqlite(sqlx::SqlitePool),
|
||||
#[cfg(feature = "postgres")]
|
||||
Postgres(sqlx::PgPool),
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum EventBusBackend {
|
||||
Db,
|
||||
#[cfg(feature = "nats")]
|
||||
Nats,
|
||||
}
|
||||
|
||||
impl EventBusBackend {
|
||||
fn from_env() -> anyhow::Result<Self> {
|
||||
match std::env::var("EVENT_BUS_BACKEND")
|
||||
.unwrap_or_else(|_| "db".to_string())
|
||||
.as_str()
|
||||
{
|
||||
"db" => Ok(Self::Db),
|
||||
#[cfg(feature = "nats")]
|
||||
"nats" => Ok(Self::Nats),
|
||||
#[cfg(not(feature = "nats"))]
|
||||
"nats" => anyhow::bail!("EVENT_BUS_BACKEND=nats requires the nats feature to be compiled in"),
|
||||
other => anyhow::bail!("unknown EVENT_BUS_BACKEND={other}, expected 'db' or 'nats'"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn init_tracing() {
|
||||
let filter = std::env::var("RUST_LOG")
|
||||
.unwrap_or_else(|_| "worker=info,application=info".to_string());
|
||||
tracing_subscriber::registry()
|
||||
.with(tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| "info".into()))
|
||||
.with(tracing_subscriber::EnvFilter::new(filter))
|
||||
.with(tracing_subscriber::fmt::layer())
|
||||
.init();
|
||||
}
|
||||
|
||||
@@ -3,5 +3,7 @@ set -euo pipefail
|
||||
|
||||
IMAGE="registry.gabrielkaszewski.dev/movies-diary:latest"
|
||||
|
||||
docker buildx build --platform linux/amd64 -t "$IMAGE" --push .
|
||||
docker buildx build --platform linux/amd64 \
|
||||
--build-arg FEATURES=sqlite,sqlite-federation,nats \
|
||||
-t "$IMAGE" --push .
|
||||
echo "pushed $IMAGE"
|
||||
|
||||
Reference in New Issue
Block a user