From 0e9911ebfc6b70ee1467c68f9475f055b177d14f Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 31 May 2026 11:50:16 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20event=20infrastructure=20=E2=80=94=20pa?= =?UTF-8?q?yload,=20transport,=20NATS=20adapter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - EventPublisher now takes &DomainEvent (11 call sites + 3 impls updated) - EventEnvelope + EventConsumer port in domain - event-payload: serializable DomainEvent mirror with subject routing - event-transport: generic Transport/MessageSource traits, publisher/consumer adapters - adapters-nats: JetStream publish + durable pull consumer --- Cargo.lock | 507 +++++++++++++++++- Cargo.toml | 8 + crates/adapters/event-payload/Cargo.toml | 11 + crates/adapters/event-payload/src/lib.rs | 252 +++++++++ crates/adapters/event-payload/src/tests.rs | 89 +++ crates/adapters/event-transport/Cargo.toml | 15 + crates/adapters/event-transport/src/lib.rs | 99 ++++ crates/adapters/event-transport/src/tests.rs | 61 +++ crates/adapters/nats/Cargo.toml | 13 + crates/adapters/nats/src/lib.rs | 214 ++++++++ .../src/catalog/commands/register_asset.rs | 2 +- .../src/catalog/commands/update_metadata.rs | 2 +- .../src/processing/commands/complete_job.rs | 2 +- .../src/processing/commands/enqueue_job.rs | 2 +- .../processing/commands/execute_pipeline.rs | 4 +- .../src/processing/commands/fail_job.rs | 4 +- .../src/sharing/commands/revoke_share.rs | 2 +- .../src/sharing/commands/share_resource.rs | 2 +- .../src/storage/commands/ingest_asset.rs | 2 +- crates/application/src/testing/fakes.rs | 4 +- crates/bootstrap/src/log_event_publisher.rs | 2 +- crates/domain/src/common/events.rs | 7 + crates/domain/src/common/ports.rs | 9 +- crates/worker/src/main.rs | 2 +- 24 files changed, 1294 insertions(+), 21 deletions(-) create mode 100644 crates/adapters/event-payload/Cargo.toml create mode 100644 crates/adapters/event-payload/src/lib.rs create mode 100644 crates/adapters/event-payload/src/tests.rs create mode 100644 crates/adapters/event-transport/Cargo.toml create mode 100644 crates/adapters/event-transport/src/lib.rs create mode 100644 crates/adapters/event-transport/src/tests.rs create mode 100644 crates/adapters/nats/Cargo.toml create mode 100644 crates/adapters/nats/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 78fd7e7..d860935 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,19 @@ dependencies = [ "uuid", ] +[[package]] +name = "adapters-nats" +version = "0.1.0" +dependencies = [ + "async-nats", + "async-trait", + "domain", + "event-transport", + "futures", + "tokio", + "tracing", +] + [[package]] name = "adapters-postgres" version = "0.1.0" @@ -105,6 +118,42 @@ dependencies = [ "uuid", ] +[[package]] +name = "async-nats" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31811585c7c5bc2f60f8b80d5a6b0f737115611dac47567d7f7d94562ebb180b" +dependencies = [ + "base64", + "bytes", + "futures-util", + "memchr", + "nkeys", + "nuid", + "pin-project", + "portable-atomic", + "rand 0.10.1", + "regex", + "ring", + "rustls-native-certs", + "rustls-pki-types", + "rustls-webpki", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror", + "time", + "tokio", + "tokio-rustls", + "tokio-stream", + "tokio-util", + "tokio-websockets", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -292,6 +341,9 @@ name = "bytes" version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +dependencies = [ + "serde", +] [[package]] name = "cc" @@ -315,6 +367,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.1", +] + [[package]] name = "chrono" version = "0.4.42" @@ -379,6 +442,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc" version = "3.4.0" @@ -419,6 +491,38 @@ dependencies = [ "typenum", ] +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "curve25519-dalek-derive", + "digest", + "fiat-crypto", + "rustc_version", + "subtle", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "data-encoding" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4ae5f15dda3c708c0ade84bfee31ccab44a3da4f88015ed22f63732abe300c8" + [[package]] name = "der" version = "0.7.10" @@ -437,6 +541,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" dependencies = [ "powerfmt", + "serde_core", ] [[package]] @@ -482,6 +587,28 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9" +dependencies = [ + "curve25519-dalek", + "ed25519", + "sha2", + "signature", + "subtle", +] + [[package]] name = "either" version = "1.15.0" @@ -528,6 +655,36 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "event-payload" +version = "0.1.0" +dependencies = [ + "chrono", + "domain", + "serde", + "serde_json", + "uuid", +] + +[[package]] +name = "event-transport" +version = "0.1.0" +dependencies = [ + "async-trait", + "domain", + "event-payload", + "futures", + "serde_json", + "tokio", + "tracing", +] + +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "find-msvc-tools" version = "0.1.6" @@ -697,11 +854,25 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "r-efi", + "r-efi 5.3.0", "wasip2", "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi 6.0.0", + "rand_core 0.10.1", + "wasip2", + "wasip3", +] + [[package]] name = "h2" version = "0.4.14" @@ -1003,6 +1174,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + [[package]] name = "idna" version = "1.1.0" @@ -1100,6 +1277,12 @@ dependencies = [ "spin", ] +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "libc" version = "0.2.178" @@ -1226,6 +1409,21 @@ dependencies = [ "version_check", ] +[[package]] +name = "nkeys" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879011babc47a1c7fdf5a935ae3cfe94f34645ca0cac1c7f6424b36fc743d1bf" +dependencies = [ + "data-encoding", + "ed25519", + "ed25519-dalek", + "getrandom 0.2.17", + "log", + "rand 0.8.6", + "signatory", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -1235,6 +1433,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nuid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" +dependencies = [ + "rand 0.8.6", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -1394,6 +1601,26 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "pin-project" +version = "1.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2466b2336ed02bcdca6b294417127b90ec92038d1d5c4fbeac971a922e0e0924" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c96395f0a926bc13b1c17622aaddda1ecb55d49c8f1bf9777e4d877800a43f8b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.17" @@ -1433,6 +1660,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + [[package]] name = "potential_utf" version = "0.1.4" @@ -1477,6 +1710,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.104" @@ -1566,6 +1809,12 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + [[package]] name = "rand" version = "0.8.6" @@ -1587,6 +1836,17 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "rand" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.1", +] + [[package]] name = "rand_chacha" version = "0.3.1" @@ -1625,6 +1885,12 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rand_core" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" + [[package]] name = "redox_syscall" version = "0.5.18" @@ -1754,6 +2020,15 @@ version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustls" version = "0.23.40" @@ -1869,6 +2144,12 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" + [[package]] name = "serde" version = "1.0.228" @@ -1912,6 +2193,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + [[package]] name = "serde_path_to_error" version = "0.1.20" @@ -1923,6 +2213,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -1942,7 +2243,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", ] @@ -1953,7 +2254,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", ] @@ -1972,6 +2273,18 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8", + "rand_core 0.6.4", + "signature", + "zeroize", +] + [[package]] name = "signature" version = "2.2.0" @@ -2454,6 +2767,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-websockets" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f591660438b3038dd04d16c938271c79e7e06260ad2ea2885a4861bfb238605d" +dependencies = [ + "base64", + "bytes", + "futures-core", + "futures-sink", + "http", + "httparse", + "rand 0.8.6", + "ring", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tokio-util", + "webpki-roots 0.26.11", +] + [[package]] name = "tower" version = "0.5.3" @@ -2569,6 +2903,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tryhard" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fe58ebd5edd976e0fe0f8a14d2a04b7c81ef153ea9a54eebc42e67c2c23b4e5" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "typenum" version = "1.20.0" @@ -2602,6 +2946,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7df058c713841ad818f1dc5d3fd88063241cc61f49f5fbea4b951e8cf5a8d71d" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "untrusted" version = "0.9.0" @@ -2724,7 +3074,16 @@ version = "1.0.1+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.46.0", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen 0.51.0", ] [[package]] @@ -2791,6 +3150,28 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + [[package]] name = "wasm-streams" version = "0.4.2" @@ -2804,6 +3185,18 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags", + "hashbrown 0.15.5", + "indexmap", + "semver", +] + [[package]] name = "web-sys" version = "0.3.83" @@ -2824,6 +3217,24 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.7", +] + +[[package]] +name = "webpki-roots" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52f5ee44c96cf55f1b349600768e3ece3a8f26010c05265ab73f945bb1a2eb9d" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "whoami" version = "1.6.1" @@ -3056,6 +3467,94 @@ version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + [[package]] name = "worker" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index cf98bcb..7943174 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,9 @@ members = [ "crates/adapters/postgres", "crates/adapters/auth", "crates/adapters/storage", + "crates/adapters/event-payload", + "crates/adapters/event-transport", + "crates/adapters/nats", "crates/presentation", "crates/bootstrap", "crates/worker", @@ -40,4 +43,9 @@ application = { path = "crates/application" } api-types = { path = "crates/api-types" } adapters-auth = { path = "crates/adapters/auth" } adapters-storage = { path = "crates/adapters/storage" } +event-payload = { path = "crates/adapters/event-payload" } +event-transport = { path = "crates/adapters/event-transport" } +adapters-nats = { path = "crates/adapters/nats" } +async-nats = "0.48" +async-stream = "0.3" presentation = { path = "crates/presentation" } diff --git a/crates/adapters/event-payload/Cargo.toml b/crates/adapters/event-payload/Cargo.toml new file mode 100644 index 0000000..acd7a9e --- /dev/null +++ b/crates/adapters/event-payload/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "event-payload" +version = "0.1.0" +edition = "2024" + +[dependencies] +domain = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs new file mode 100644 index 0000000..2e0c523 --- /dev/null +++ b/crates/adapters/event-payload/src/lib.rs @@ -0,0 +1,252 @@ +use domain::{errors::DomainError, events::DomainEvent, value_objects::SystemId}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", content = "data")] +pub enum EventPayload { + AssetIngested { + asset_id: String, + owner_user_id: String, + timestamp: String, + }, + MetadataUpdated { + asset_id: String, + updated_by: String, + timestamp: String, + }, + AssetDeleted { + asset_id: String, + deleted_by: String, + timestamp: String, + }, + ShareCreated { + scope_id: String, + shareable_id: String, + created_by: String, + timestamp: String, + }, + ShareRevoked { + scope_id: String, + revoked_by: String, + timestamp: String, + }, + SidecarSyncRequested { + asset_id: String, + timestamp: String, + }, + JobEnqueued { + job_id: String, + job_type: String, + timestamp: String, + }, + JobCompleted { + job_id: String, + timestamp: String, + }, + JobFailed { + job_id: String, + error: String, + timestamp: String, + }, +} + +impl EventPayload { + pub fn subject(&self) -> &'static str { + match self { + Self::AssetIngested { .. } => "assets.ingested", + Self::MetadataUpdated { .. } => "metadata.updated", + Self::AssetDeleted { .. } => "assets.deleted", + Self::ShareCreated { .. } => "shares.created", + Self::ShareRevoked { .. } => "shares.revoked", + Self::SidecarSyncRequested { .. } => "sidecars.sync_requested", + Self::JobEnqueued { .. } => "jobs.enqueued", + Self::JobCompleted { .. } => "jobs.completed", + Self::JobFailed { .. } => "jobs.failed", + } + } +} + +impl From<&DomainEvent> for EventPayload { + fn from(e: &DomainEvent) -> Self { + match e { + DomainEvent::AssetIngested { + asset_id, + owner_user_id, + timestamp, + } => Self::AssetIngested { + asset_id: asset_id.to_string(), + owner_user_id: owner_user_id.to_string(), + timestamp: timestamp.to_string(), + }, + DomainEvent::MetadataUpdated { + asset_id, + updated_by, + timestamp, + } => Self::MetadataUpdated { + asset_id: asset_id.to_string(), + updated_by: updated_by.to_string(), + timestamp: timestamp.to_string(), + }, + DomainEvent::AssetDeleted { + asset_id, + deleted_by, + timestamp, + } => Self::AssetDeleted { + asset_id: asset_id.to_string(), + deleted_by: deleted_by.to_string(), + timestamp: timestamp.to_string(), + }, + DomainEvent::ShareCreated { + scope_id, + shareable_id, + created_by, + timestamp, + } => Self::ShareCreated { + scope_id: scope_id.to_string(), + shareable_id: shareable_id.to_string(), + created_by: created_by.to_string(), + timestamp: timestamp.to_string(), + }, + DomainEvent::ShareRevoked { + scope_id, + revoked_by, + timestamp, + } => Self::ShareRevoked { + scope_id: scope_id.to_string(), + revoked_by: revoked_by.to_string(), + timestamp: timestamp.to_string(), + }, + DomainEvent::SidecarSyncRequested { + asset_id, + timestamp, + } => Self::SidecarSyncRequested { + asset_id: asset_id.to_string(), + timestamp: timestamp.to_string(), + }, + DomainEvent::JobEnqueued { + job_id, + job_type, + timestamp, + } => Self::JobEnqueued { + job_id: job_id.to_string(), + job_type: job_type.clone(), + timestamp: timestamp.to_string(), + }, + DomainEvent::JobCompleted { job_id, timestamp } => Self::JobCompleted { + job_id: job_id.to_string(), + timestamp: timestamp.to_string(), + }, + DomainEvent::JobFailed { + job_id, + error, + timestamp, + } => Self::JobFailed { + job_id: job_id.to_string(), + error: error.clone(), + timestamp: timestamp.to_string(), + }, + } + } +} + +fn parse_uuid(s: &str, field: &str) -> Result { + uuid::Uuid::parse_str(s) + .map_err(|_| DomainError::Internal(format!("invalid uuid for {field}: {s}"))) +} + +fn parse_timestamp(s: &str) -> Result { + use chrono::DateTime; + let dt = DateTime::parse_from_rfc3339(s) + .map_err(|_| DomainError::Internal(format!("invalid timestamp: {s}")))?; + Ok(domain::value_objects::DateTimeStamp::from_datetime( + dt.with_timezone(&chrono::Utc), + )) +} + +impl TryFrom for DomainEvent { + type Error = DomainError; + + fn try_from(p: EventPayload) -> Result { + Ok(match p { + EventPayload::AssetIngested { + asset_id, + owner_user_id, + timestamp, + } => DomainEvent::AssetIngested { + asset_id: SystemId::from_uuid(parse_uuid(&asset_id, "asset_id")?), + owner_user_id: SystemId::from_uuid(parse_uuid(&owner_user_id, "owner_user_id")?), + timestamp: parse_timestamp(×tamp)?, + }, + EventPayload::MetadataUpdated { + asset_id, + updated_by, + timestamp, + } => DomainEvent::MetadataUpdated { + asset_id: SystemId::from_uuid(parse_uuid(&asset_id, "asset_id")?), + updated_by: SystemId::from_uuid(parse_uuid(&updated_by, "updated_by")?), + timestamp: parse_timestamp(×tamp)?, + }, + EventPayload::AssetDeleted { + asset_id, + deleted_by, + timestamp, + } => DomainEvent::AssetDeleted { + asset_id: SystemId::from_uuid(parse_uuid(&asset_id, "asset_id")?), + deleted_by: SystemId::from_uuid(parse_uuid(&deleted_by, "deleted_by")?), + timestamp: parse_timestamp(×tamp)?, + }, + EventPayload::ShareCreated { + scope_id, + shareable_id, + created_by, + timestamp, + } => DomainEvent::ShareCreated { + scope_id: SystemId::from_uuid(parse_uuid(&scope_id, "scope_id")?), + shareable_id: SystemId::from_uuid(parse_uuid(&shareable_id, "shareable_id")?), + created_by: SystemId::from_uuid(parse_uuid(&created_by, "created_by")?), + timestamp: parse_timestamp(×tamp)?, + }, + EventPayload::ShareRevoked { + scope_id, + revoked_by, + timestamp, + } => DomainEvent::ShareRevoked { + scope_id: SystemId::from_uuid(parse_uuid(&scope_id, "scope_id")?), + revoked_by: SystemId::from_uuid(parse_uuid(&revoked_by, "revoked_by")?), + timestamp: parse_timestamp(×tamp)?, + }, + EventPayload::SidecarSyncRequested { + asset_id, + timestamp, + } => DomainEvent::SidecarSyncRequested { + asset_id: SystemId::from_uuid(parse_uuid(&asset_id, "asset_id")?), + timestamp: parse_timestamp(×tamp)?, + }, + EventPayload::JobEnqueued { + job_id, + job_type, + timestamp, + } => DomainEvent::JobEnqueued { + job_id: SystemId::from_uuid(parse_uuid(&job_id, "job_id")?), + job_type, + timestamp: parse_timestamp(×tamp)?, + }, + EventPayload::JobCompleted { job_id, timestamp } => DomainEvent::JobCompleted { + job_id: SystemId::from_uuid(parse_uuid(&job_id, "job_id")?), + timestamp: parse_timestamp(×tamp)?, + }, + EventPayload::JobFailed { + job_id, + error, + timestamp, + } => DomainEvent::JobFailed { + job_id: SystemId::from_uuid(parse_uuid(&job_id, "job_id")?), + error, + timestamp: parse_timestamp(×tamp)?, + }, + }) + } +} + +#[cfg(test)] +mod tests; diff --git a/crates/adapters/event-payload/src/tests.rs b/crates/adapters/event-payload/src/tests.rs new file mode 100644 index 0000000..b797ecb --- /dev/null +++ b/crates/adapters/event-payload/src/tests.rs @@ -0,0 +1,89 @@ +use crate::EventPayload; +use domain::{events::DomainEvent, value_objects::SystemId}; + +fn make_timestamp() -> domain::value_objects::DateTimeStamp { + domain::value_objects::DateTimeStamp::now() +} + +#[test] +fn subject_mapping() { + let cases = vec![ + ( + DomainEvent::AssetIngested { + asset_id: SystemId::new(), + owner_user_id: SystemId::new(), + timestamp: make_timestamp(), + }, + "assets.ingested", + ), + ( + DomainEvent::JobEnqueued { + job_id: SystemId::new(), + job_type: "extract_metadata".into(), + timestamp: make_timestamp(), + }, + "jobs.enqueued", + ), + ( + DomainEvent::JobFailed { + job_id: SystemId::new(), + error: "boom".into(), + timestamp: make_timestamp(), + }, + "jobs.failed", + ), + ]; + + for (event, expected_subject) in cases { + let payload = EventPayload::from(&event); + assert_eq!(payload.subject(), expected_subject); + } +} + +#[test] +fn roundtrip_asset_ingested() { + let id = SystemId::new(); + let owner = SystemId::new(); + let event = DomainEvent::AssetIngested { + asset_id: id, + owner_user_id: owner, + timestamp: make_timestamp(), + }; + + let payload = EventPayload::from(&event); + let json = serde_json::to_vec(&payload).unwrap(); + let back: EventPayload = serde_json::from_slice(&json).unwrap(); + let restored = DomainEvent::try_from(back).unwrap(); + + if let DomainEvent::AssetIngested { + asset_id, + owner_user_id, + .. + } = restored + { + assert_eq!(asset_id, id); + assert_eq!(owner_user_id, owner); + } else { + panic!("wrong variant"); + } +} + +#[test] +fn roundtrip_job_failed() { + let jid = SystemId::new(); + let event = DomainEvent::JobFailed { + job_id: jid, + error: "plugin crashed".into(), + timestamp: make_timestamp(), + }; + + let payload = EventPayload::from(&event); + let back = DomainEvent::try_from(payload).unwrap(); + + if let DomainEvent::JobFailed { job_id, error, .. } = back { + assert_eq!(job_id, jid); + assert_eq!(error, "plugin crashed"); + } else { + panic!("wrong variant"); + } +} diff --git a/crates/adapters/event-transport/Cargo.toml b/crates/adapters/event-transport/Cargo.toml new file mode 100644 index 0000000..6fcc3cb --- /dev/null +++ b/crates/adapters/event-transport/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "event-transport" +version = "0.1.0" +edition = "2024" + +[dependencies] +domain = { workspace = true } +event-payload = { workspace = true } +serde_json = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } +futures = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/crates/adapters/event-transport/src/lib.rs b/crates/adapters/event-transport/src/lib.rs new file mode 100644 index 0000000..fd6d87d --- /dev/null +++ b/crates/adapters/event-transport/src/lib.rs @@ -0,0 +1,99 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::{DomainEvent, EventEnvelope}, + ports::{EventConsumer, EventPublisher}, +}; +use event_payload::EventPayload; +use futures::stream::BoxStream; + +#[async_trait] +pub trait Transport: Send + Sync { + async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError>; +} + +pub struct EventPublisherAdapter { + transport: T, +} + +impl EventPublisherAdapter { + pub fn new(transport: T) -> Self { + Self { transport } + } +} + +#[async_trait] +impl EventPublisher for EventPublisherAdapter { + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { + let payload = EventPayload::from(event); + let subject = payload.subject(); + let bytes = + serde_json::to_vec(&payload).map_err(|e| DomainError::Internal(e.to_string()))?; + tracing::debug!(subject, "publishing event"); + self.transport.publish_bytes(subject, &bytes).await + } +} + +pub struct RawMessage { + pub subject: String, + pub payload: Vec, + pub delivery_count: u64, + pub ack: Box, + pub nack: Box, +} + +pub trait MessageSource: Send + Sync { + fn messages(&self) -> BoxStream<'_, Result>; +} + +pub struct EventConsumerAdapter { + source: S, +} + +impl EventConsumerAdapter { + pub fn new(source: S) -> Self { + Self { source } + } +} + +impl EventConsumer for EventConsumerAdapter { + fn consume(&self) -> BoxStream<'_, Result> { + use futures::StreamExt; + let stream = self.source.messages(); + Box::pin(stream.filter_map(|result| async move { + match result { + Err(e) => { + tracing::warn!("transport error: {e}"); + None + } + Ok(msg) => { + let payload = match serde_json::from_slice::(&msg.payload) { + Ok(p) => p, + Err(e) => { + tracing::warn!("failed to deserialize event payload, acking: {e}"); + (msg.ack)(); + return None; + } + }; + let event = match DomainEvent::try_from(payload) { + Ok(e) => e, + Err(e) => { + tracing::warn!("unknown event type, acking: {e}"); + (msg.ack)(); + return None; + } + }; + Some(Ok(EventEnvelope { + event, + delivery_count: msg.delivery_count, + ack: msg.ack, + nack: msg.nack, + })) + } + } + })) + } +} + +#[cfg(test)] +mod tests; diff --git a/crates/adapters/event-transport/src/tests.rs b/crates/adapters/event-transport/src/tests.rs new file mode 100644 index 0000000..0c1a453 --- /dev/null +++ b/crates/adapters/event-transport/src/tests.rs @@ -0,0 +1,61 @@ +use crate::{EventPublisherAdapter, Transport}; +use async_trait::async_trait; +use domain::{ + errors::DomainError, events::DomainEvent, ports::EventPublisher, value_objects::SystemId, +}; +use std::sync::{Arc, Mutex}; + +struct RecordingTransport { + messages: Arc)>>>, +} + +#[async_trait] +impl Transport for RecordingTransport { + async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { + self.messages + .lock() + .unwrap() + .push((subject.to_string(), bytes.to_vec())); + Ok(()) + } +} + +#[tokio::test] +async fn adapter_publishes_with_correct_subject() { + let messages = Arc::new(Mutex::new(Vec::new())); + let adapter = EventPublisherAdapter::new(RecordingTransport { + messages: messages.clone(), + }); + + let event = DomainEvent::JobCompleted { + job_id: SystemId::new(), + timestamp: domain::value_objects::DateTimeStamp::now(), + }; + + adapter.publish(&event).await.unwrap(); + + let recorded = messages.lock().unwrap(); + assert_eq!(recorded.len(), 1); + assert_eq!(recorded[0].0, "jobs.completed"); +} + +#[tokio::test] +async fn published_bytes_are_valid_json() { + let messages = Arc::new(Mutex::new(Vec::new())); + let adapter = EventPublisherAdapter::new(RecordingTransport { + messages: messages.clone(), + }); + + let event = DomainEvent::AssetIngested { + asset_id: SystemId::new(), + owner_user_id: SystemId::new(), + timestamp: domain::value_objects::DateTimeStamp::now(), + }; + + adapter.publish(&event).await.unwrap(); + + let recorded = messages.lock().unwrap(); + let payload: event_payload::EventPayload = + serde_json::from_slice(&recorded[0].1).expect("should be valid JSON"); + assert_eq!(payload.subject(), "assets.ingested"); +} diff --git a/crates/adapters/nats/Cargo.toml b/crates/adapters/nats/Cargo.toml new file mode 100644 index 0000000..55c62cd --- /dev/null +++ b/crates/adapters/nats/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "adapters-nats" +version = "0.1.0" +edition = "2024" + +[dependencies] +domain = { workspace = true } +event-transport = { workspace = true } +async-nats = { workspace = true } +async-trait = { workspace = true } +futures = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs new file mode 100644 index 0000000..d02f642 --- /dev/null +++ b/crates/adapters/nats/src/lib.rs @@ -0,0 +1,214 @@ +use async_nats::jetstream::{self, AckKind, stream::Config as StreamConfig}; +use async_trait::async_trait; +use domain::errors::DomainError; +use event_transport::{MessageSource, RawMessage, Transport}; +use futures::stream::BoxStream; +use std::sync::Arc; + +const STREAM_NAME: &str = "KPHOTOS_EVENTS"; +const STREAM_SUBJECT: &str = "kphotos-events.>"; +const CONSUMER_NAME: &str = "worker"; +const MAX_MESSAGES: i64 = 100_000; + +pub const CONSUMER_MAX_DELIVER: i64 = 5; +const CONSUMER_ACK_WAIT_SECS: u64 = 30; +const ACK_TASK_TIMEOUT_SECS: u64 = 5; + +fn stream_config() -> StreamConfig { + StreamConfig { + name: STREAM_NAME.to_string(), + subjects: vec![STREAM_SUBJECT.to_string()], + max_messages: MAX_MESSAGES, + ..Default::default() + } +} + +pub async fn ensure_stream(client: &async_nats::Client) -> Result<(), DomainError> { + let js = jetstream::new(client.clone()); + + if js.update_stream(stream_config()).await.is_ok() { + tracing::info!(subject = STREAM_SUBJECT, "JetStream stream updated"); + return Ok(()); + } + + tracing::warn!( + "JetStream stream update failed (incompatible config), deleting '{STREAM_NAME}' and recreating" + ); + let _ = js.delete_stream(STREAM_NAME).await; + + js.create_stream(stream_config()) + .await + .map(|_| ()) + .map_err(|e| DomainError::Internal(format!("JetStream stream create failed: {e}"))) +} + +pub struct NatsTransport { + jetstream: jetstream::Context, +} + +impl NatsTransport { + pub fn new(client: async_nats::Client) -> Self { + Self { + jetstream: jetstream::new(client), + } + } +} + +#[async_trait] +impl Transport for NatsTransport { + async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { + let full_subject = format!("kphotos-events.{subject}"); + self.jetstream + .publish(full_subject, bytes.to_vec().into()) + .await + .map_err(|e| DomainError::Internal(e.to_string()))? + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} + +pub struct NatsMessageSource { + jetstream: jetstream::Context, +} + +impl NatsMessageSource { + pub fn new(client: async_nats::Client) -> Self { + Self { + jetstream: jetstream::new(client), + } + } +} + +impl MessageSource for NatsMessageSource { + fn messages(&self) -> BoxStream<'_, Result> { + use futures::stream; + use tokio::sync::Mutex as TokioMutex; + + let js = self.jetstream.clone(); + let (tx, rx) = tokio::sync::mpsc::channel::>(128); + + tokio::spawn(async move { + let stream = match js.get_stream(STREAM_NAME).await { + Ok(s) => s, + Err(e) => { + let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await; + return; + } + }; + + if let Ok(info) = stream.consumer_info(CONSUMER_NAME).await + && info.config.deliver_subject.is_some() + { + tracing::info!( + "deleting old push consumer '{CONSUMER_NAME}', replacing with pull" + ); + let _ = stream.delete_consumer(CONSUMER_NAME).await; + } + + let consumer = match stream + .get_or_create_consumer( + CONSUMER_NAME, + jetstream::consumer::pull::Config { + durable_name: Some(CONSUMER_NAME.to_string()), + deliver_policy: jetstream::consumer::DeliverPolicy::New, + ack_policy: jetstream::consumer::AckPolicy::Explicit, + ack_wait: std::time::Duration::from_secs(CONSUMER_ACK_WAIT_SECS), + max_deliver: CONSUMER_MAX_DELIVER, + ..Default::default() + }, + ) + .await + { + Ok(c) => c, + Err(e) => { + let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await; + return; + } + }; + + tracing::info!("NATS pull consumer ready"); + + loop { + let mut messages = match consumer.messages().await { + Ok(m) => m, + Err(e) => { + tracing::error!("NATS messages() failed: {e}"); + let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await; + return; + } + }; + + use futures::StreamExt; + while let Some(result) = messages.next().await { + let msg = match result { + Ok(m) => m, + Err(e) => { + tracing::warn!("NATS message error: {e}"); + continue; + } + }; + + let subject = msg.subject.to_string(); + let payload = msg.payload.to_vec(); + let delivery_count = msg + .info() + .map(|info| info.delivered.max(0) as u64) + .unwrap_or(1); + let msg = Arc::new(msg); + let msg_nack = Arc::clone(&msg); + + let raw = RawMessage { + subject, + payload, + delivery_count, + ack: Box::new(move || { + let m = Arc::clone(&msg); + tokio::spawn(async move { + let result = tokio::time::timeout( + std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS), + m.ack(), + ) + .await; + match result { + Ok(Ok(())) => {} + Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"), + Err(_) => tracing::warn!( + "NATS ack timed out after {ACK_TASK_TIMEOUT_SECS}s" + ), + } + }); + }), + nack: Box::new(move || { + let m = Arc::clone(&msg_nack); + tokio::spawn(async move { + let result = tokio::time::timeout( + std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS), + m.ack_with(AckKind::Nak(None)), + ) + .await; + match result { + Ok(Ok(())) => {} + Ok(Err(e)) => tracing::warn!("NATS nack failed: {e}"), + Err(_) => tracing::warn!( + "NATS nack timed out after {ACK_TASK_TIMEOUT_SECS}s" + ), + } + }); + }), + }; + + if tx.send(Ok(raw)).await.is_err() { + return; + } + } + } + }); + + let rx = Arc::new(TokioMutex::new(rx)); + Box::pin(stream::unfold(rx, |rx| async move { + let item = rx.lock().await.recv().await?; + Some((item, rx)) + })) + } +} diff --git a/crates/application/src/catalog/commands/register_asset.rs b/crates/application/src/catalog/commands/register_asset.rs index 3a2b89e..fe2a11f 100644 --- a/crates/application/src/catalog/commands/register_asset.rs +++ b/crates/application/src/catalog/commands/register_asset.rs @@ -69,7 +69,7 @@ impl RegisterAssetHandler { }; self.event_pub - .publish(DomainEvent::AssetIngested { + .publish(&DomainEvent::AssetIngested { asset_id: asset.asset_id, owner_user_id: asset.owner_user_id, timestamp: DateTimeStamp::now(), diff --git a/crates/application/src/catalog/commands/update_metadata.rs b/crates/application/src/catalog/commands/update_metadata.rs index 41323c0..fe4b90d 100644 --- a/crates/application/src/catalog/commands/update_metadata.rs +++ b/crates/application/src/catalog/commands/update_metadata.rs @@ -43,7 +43,7 @@ impl UpdateMetadataHandler { self.metadata_repo.save(&metadata).await?; self.event_pub - .publish(DomainEvent::MetadataUpdated { + .publish(&DomainEvent::MetadataUpdated { asset_id: cmd.asset_id, updated_by: cmd.user_id, timestamp: DateTimeStamp::now(), diff --git a/crates/application/src/processing/commands/complete_job.rs b/crates/application/src/processing/commands/complete_job.rs index a208d16..d17de35 100644 --- a/crates/application/src/processing/commands/complete_job.rs +++ b/crates/application/src/processing/commands/complete_job.rs @@ -49,7 +49,7 @@ impl CompleteJobHandler { self.batch_repo.save(&batch).await?; } self.event_pub - .publish(DomainEvent::JobCompleted { + .publish(&DomainEvent::JobCompleted { job_id: job.job_id, timestamp: DateTimeStamp::now(), }) diff --git a/crates/application/src/processing/commands/enqueue_job.rs b/crates/application/src/processing/commands/enqueue_job.rs index 9e6b4bc..8709a40 100644 --- a/crates/application/src/processing/commands/enqueue_job.rs +++ b/crates/application/src/processing/commands/enqueue_job.rs @@ -39,7 +39,7 @@ impl EnqueueJobHandler { } self.job_repo.save(&job).await?; self.event_pub - .publish(DomainEvent::JobEnqueued { + .publish(&DomainEvent::JobEnqueued { job_id: job.job_id, job_type: format!("{:?}", cmd.job_type), timestamp: DateTimeStamp::now(), diff --git a/crates/application/src/processing/commands/execute_pipeline.rs b/crates/application/src/processing/commands/execute_pipeline.rs index b12b734..d277561 100644 --- a/crates/application/src/processing/commands/execute_pipeline.rs +++ b/crates/application/src/processing/commands/execute_pipeline.rs @@ -77,7 +77,7 @@ impl ExecutePipelineHandler { self.job_repo.save(&job).await?; self.update_batch_on_complete(&job).await?; self.event_pub - .publish(DomainEvent::JobCompleted { + .publish(&DomainEvent::JobCompleted { job_id: job.job_id, timestamp: DateTimeStamp::now(), }) @@ -89,7 +89,7 @@ impl ExecutePipelineHandler { self.job_repo.save(&job).await?; self.update_batch_on_fail(&job).await?; self.event_pub - .publish(DomainEvent::JobFailed { + .publish(&DomainEvent::JobFailed { job_id: job.job_id, error: error_msg, timestamp: DateTimeStamp::now(), diff --git a/crates/application/src/processing/commands/fail_job.rs b/crates/application/src/processing/commands/fail_job.rs index e3c3c30..7dc6639 100644 --- a/crates/application/src/processing/commands/fail_job.rs +++ b/crates/application/src/processing/commands/fail_job.rs @@ -49,7 +49,7 @@ impl FailJobHandler { self.batch_repo.save(&batch).await?; } self.event_pub - .publish(DomainEvent::JobFailed { + .publish(&DomainEvent::JobFailed { job_id: job.job_id, error: cmd.error, timestamp: DateTimeStamp::now(), @@ -57,7 +57,7 @@ impl FailJobHandler { .await?; } else if job.status == JobStatus::Queued { self.event_pub - .publish(DomainEvent::JobEnqueued { + .publish(&DomainEvent::JobEnqueued { job_id: job.job_id, job_type: format!("{:?}", job.job_type), timestamp: DateTimeStamp::now(), diff --git a/crates/application/src/sharing/commands/revoke_share.rs b/crates/application/src/sharing/commands/revoke_share.rs index 41261ea..1c375b5 100644 --- a/crates/application/src/sharing/commands/revoke_share.rs +++ b/crates/application/src/sharing/commands/revoke_share.rs @@ -36,7 +36,7 @@ impl RevokeShareHandler { self.share_repo.delete_scope(&cmd.scope_id).await?; self.event_pub - .publish(DomainEvent::ShareRevoked { + .publish(&DomainEvent::ShareRevoked { scope_id: cmd.scope_id, revoked_by: cmd.revoked_by, timestamp: DateTimeStamp::now(), diff --git a/crates/application/src/sharing/commands/share_resource.rs b/crates/application/src/sharing/commands/share_resource.rs index 1771251..1ed075d 100644 --- a/crates/application/src/sharing/commands/share_resource.rs +++ b/crates/application/src/sharing/commands/share_resource.rs @@ -51,7 +51,7 @@ impl ShareResourceHandler { self.share_repo.save_target(&target).await?; self.event_pub - .publish(DomainEvent::ShareCreated { + .publish(&DomainEvent::ShareCreated { scope_id: scope.scope_id, shareable_id: cmd.shareable_id, created_by: cmd.created_by, diff --git a/crates/application/src/storage/commands/ingest_asset.rs b/crates/application/src/storage/commands/ingest_asset.rs index 9f5599a..6941a74 100644 --- a/crates/application/src/storage/commands/ingest_asset.rs +++ b/crates/application/src/storage/commands/ingest_asset.rs @@ -145,7 +145,7 @@ impl IngestAssetHandler { self.ledger_repo.record(&entry).await?; self.event_pub - .publish(DomainEvent::AssetIngested { + .publish(&DomainEvent::AssetIngested { asset_id: asset.asset_id, owner_user_id: cmd.uploader_id, timestamp: DateTimeStamp::now(), diff --git a/crates/application/src/testing/fakes.rs b/crates/application/src/testing/fakes.rs index 7832e99..81c713e 100644 --- a/crates/application/src/testing/fakes.rs +++ b/crates/application/src/testing/fakes.rs @@ -37,8 +37,8 @@ impl Default for StubEventPublisher { #[async_trait] impl EventPublisher for StubEventPublisher { - async fn publish(&self, event: DomainEvent) -> Result<(), DomainError> { - self.events.lock().await.push(event); + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { + self.events.lock().await.push(event.clone()); Ok(()) } } diff --git a/crates/bootstrap/src/log_event_publisher.rs b/crates/bootstrap/src/log_event_publisher.rs index 188842e..058200f 100644 --- a/crates/bootstrap/src/log_event_publisher.rs +++ b/crates/bootstrap/src/log_event_publisher.rs @@ -5,7 +5,7 @@ pub struct LogEventPublisher; #[async_trait] impl EventPublisher for LogEventPublisher { - async fn publish(&self, event: DomainEvent) -> Result<(), DomainError> { + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { tracing::info!(?event, "domain event published"); Ok(()) } diff --git a/crates/domain/src/common/events.rs b/crates/domain/src/common/events.rs index a1fe15b..9f7aa3e 100644 --- a/crates/domain/src/common/events.rs +++ b/crates/domain/src/common/events.rs @@ -1,5 +1,12 @@ use crate::common::value_objects::{DateTimeStamp, SystemId}; +pub struct EventEnvelope { + pub event: DomainEvent, + pub delivery_count: u64, + pub ack: Box, + pub nack: Box, +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum DomainEvent { AssetIngested { diff --git a/crates/domain/src/common/ports.rs b/crates/domain/src/common/ports.rs index a72cee8..b2d8729 100644 --- a/crates/domain/src/common/ports.rs +++ b/crates/domain/src/common/ports.rs @@ -1,8 +1,13 @@ use crate::common::errors::DomainError; -use crate::common::events::DomainEvent; +use crate::common::events::{DomainEvent, EventEnvelope}; use async_trait::async_trait; +use futures::stream::BoxStream; #[async_trait] pub trait EventPublisher: Send + Sync { - async fn publish(&self, event: DomainEvent) -> Result<(), DomainError>; + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError>; +} + +pub trait EventConsumer: Send + Sync { + fn consume(&self) -> BoxStream<'_, Result>; } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 42eb770..221b4f5 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -55,7 +55,7 @@ struct LogEventPublisher; impl domain::ports::EventPublisher for LogEventPublisher { async fn publish( &self, - event: domain::events::DomainEvent, + event: &domain::events::DomainEvent, ) -> Result<(), domain::errors::DomainError> { info!(event = ?event, "domain event"); Ok(())