From 696e3e170c250a37950fa832c934f61dc1e8d8d9 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Tue, 12 May 2026 15:05:28 +0200 Subject: [PATCH] feat: async image conversion service (avif/webp) with backfill --- Cargo.lock | 492 +++++++++++++++++- Cargo.toml | 2 + crates/adapters/event-payload/src/lib.rs | 24 + crates/adapters/image-converter/Cargo.toml | 19 + .../adapters/image-converter/src/backfill.rs | 141 +++++ crates/adapters/image-converter/src/config.rs | 90 ++++ .../adapters/image-converter/src/handler.rs | 224 ++++++++ crates/adapters/image-converter/src/lib.rs | 36 ++ crates/adapters/nats/src/subject.rs | 1 + crates/adapters/poster-sync/src/lib.rs | 12 +- crates/adapters/postgres/src/image_ref.rs | 48 ++ crates/adapters/postgres/src/lib.rs | 2 + crates/adapters/sqlite/src/image_ref.rs | 155 ++++++ crates/adapters/sqlite/src/lib.rs | 2 + .../application/src/use_cases/sync_poster.rs | 9 + .../src/use_cases/update_profile.rs | 8 + crates/application/src/worker.rs | 1 + crates/domain/src/events.rs | 3 + crates/domain/src/ports.rs | 6 + crates/worker/Cargo.toml | 1 + crates/worker/src/db.rs | 11 +- crates/worker/src/main.rs | 15 + 22 files changed, 1286 insertions(+), 16 deletions(-) create mode 100644 crates/adapters/image-converter/Cargo.toml create mode 100644 crates/adapters/image-converter/src/backfill.rs create mode 100644 crates/adapters/image-converter/src/config.rs create mode 100644 crates/adapters/image-converter/src/handler.rs create mode 100644 crates/adapters/image-converter/src/lib.rs create mode 100644 crates/adapters/postgres/src/image_ref.rs create mode 100644 crates/adapters/sqlite/src/image_ref.rs diff --git a/Cargo.lock b/Cargo.lock index 25d0737..5da4424 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -264,6 +264,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "aligned-vec" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc890384c8602f339876ded803c97ad529f3842aba97f6392b3dba0dd171769b" +dependencies = [ + "equator", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -327,6 +336,17 @@ dependencies = [ "derive_arbitrary", ] +[[package]] +name = "arg_enum_proc_macro" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ae92a5119aa49cdbcf6b9f893fe4e1d98b04ccbf82ee0584ad948a44a734dea" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "argon2" version = "0.5.3" @@ -339,6 +359,12 @@ dependencies = [ "password-hash", ] +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "askama" version = "0.16.0" @@ -389,7 +415,7 @@ dependencies = [ "serde", "serde_derive", "unicode-ident", - "winnow", + "winnow 1.0.2", ] [[package]] @@ -617,6 +643,29 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "av1-grain" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cfddb07216410377231960af4fcab838eaa12e013417781b78bd95ee22077f8" +dependencies = [ + "anyhow", + "arrayvec", + "log", + "nom 8.0.0", + "num-rational", + "v_frame", +] + +[[package]] +name = "avif-serialize" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7178fe5f7d460b13895ebb9dcb28a3a6216d2df2574a0806cb51b555d297f38" +dependencies = [ + "arrayvec", +] + [[package]] name = "aws-lc-rs" version = "1.16.3" @@ -775,6 +824,12 @@ dependencies = [ "serde_core", ] +[[package]] +name = "bitstream-io" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6099cdc01846bc367c4e7dd630dc5966dccf36b652fae7a74e17b640411a91b2" + [[package]] name = "blake2" version = "0.10.6" @@ -815,6 +870,12 @@ dependencies = [ "piper", ] +[[package]] +name = "built" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56ed6191a7e78c36abdb16ab65341eefd73d64d303fffccdbb00d51e4205967b" + [[package]] name = "bumpalo" version = "3.20.2" @@ -833,6 +894,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" +[[package]] +name = "byteorder-lite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" + [[package]] name = "bytes" version = "1.11.1" @@ -907,6 +974,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "cfg-expr" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d067ad48b8650848b989a59a86c6c36a995d02d2bf778d45c3c5d57bc2718f02" +dependencies = [ + "smallvec", + "target-lexicon", +] + [[package]] name = "cfg-if" version = "1.0.4" @@ -1588,6 +1665,26 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "equator" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4711b213838dfee0117e3be6ac926007d7f433d7bbe33595975d4190cb07e6fc" +dependencies = [ + "equator-macro", +] + +[[package]] +name = "equator-macro" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -1694,6 +1791,15 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" +[[package]] +name = "fdeflate" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e6853b52649d4ac5c0bd02320cddc5ba956bdb407c4b75a2c6b75bf51500f8c" +dependencies = [ + "simd-adler32", +] + [[package]] name = "fiat-crypto" version = "0.2.9" @@ -2391,6 +2497,39 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "image" +version = "0.25.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85ab80394333c02fe689eaf900ab500fbd0c2213da414687ebf995a65d5a6104" +dependencies = [ + "bytemuck", + "byteorder-lite", + "image-webp", + "moxcms", + "num-traits", + "png", + "zune-core", + "zune-jpeg", +] + +[[package]] +name = "image-converter" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "domain", + "image", + "image-storage", + "object_store", + "ravif", + "tokio", + "tracing", + "uuid", + "webp", +] + [[package]] name = "image-storage" version = "0.1.0" @@ -2405,6 +2544,22 @@ dependencies = [ "uuid", ] +[[package]] +name = "image-webp" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525e9ff3e1a4be2fbea1fdf0e98686a6d98b4d8f937e1bf7402245af1909e8c3" +dependencies = [ + "byteorder-lite", + "quick-error", +] + +[[package]] +name = "imgref" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40fac9d56ed6437b198fddba683305e8e2d651aa42647f00f5ae542e7f5c94a2" + [[package]] name = "impl-more" version = "0.1.9" @@ -2474,6 +2629,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "interpolate_name" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34819042dc3d3971c46c2190835914dfbe0c3c13f61449b2997f4e9722dfa60" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "ipnet" version = "2.12.0" @@ -2490,6 +2656,15 @@ dependencies = [ "serde", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -2653,6 +2828,16 @@ version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" +[[package]] +name = "libfuzzer-sys" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f12a681b7dd8ce12bff52488013ba614b869148d54dd79836ab85aafdd53f08d" +dependencies = [ + "arbitrary", + "cc", +] + [[package]] name = "libm" version = "0.2.16" @@ -2682,6 +2867,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libwebp-sys" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54cd30df7c7165ce74a456e4ca9732c603e8dc5e60784558c1c6dc047f876733" +dependencies = [ + "cc", + "glob", +] + [[package]] name = "line-clipping" version = "0.3.7" @@ -2730,6 +2925,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "loop9" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fae87c125b03c1d2c0150c90365d7d6bcc53fb73a9acaef207d2d065860f062" +dependencies = [ + "imgref", +] + [[package]] name = "lru" version = "0.16.4" @@ -2770,6 +2974,15 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "maybe-rayon" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea1f30cedd69f0a2954655f7188c6a834246d2bcf1e315e2ac40c4b24dc9519" +dependencies = [ + "cfg-if", +] + [[package]] name = "md-5" version = "0.10.6" @@ -2876,6 +3089,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "moxcms" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb85c154ba489f01b25c0d36ae69a87e4a1c73a72631fc6c0eb6dde34a73e44b" +dependencies = [ + "num-traits", + "pxfm", +] + [[package]] name = "multer" version = "3.1.0" @@ -2916,6 +3139,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c96aba5aa877601bb3f6dd6a63a969e1f82e60646e81e71b14496995e9853c91" +[[package]] +name = "new_debug_unreachable" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" + [[package]] name = "nix" version = "0.29.0" @@ -2954,6 +3183,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nom" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405" +dependencies = [ + "memchr", +] + [[package]] name = "nonempty" version = "0.7.0" @@ -2966,6 +3204,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" +[[package]] +name = "noop_proc_macro" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0676bb32a98c1a483ce53e500a81ad9c3d5b3f7c920c28c24e9cb0980d0b5bc8" + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -3207,6 +3451,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "pem" version = "3.0.6" @@ -3397,6 +3647,19 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" +[[package]] +name = "png" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60769b8b31b2a9f263dae2776c37b1b28ae246943cf719eb6946a1db05128a61" +dependencies = [ + "bitflags 2.11.1", + "crc32fast", + "fdeflate", + "flate2", + "miniz_oxide", +] + [[package]] name = "polling" version = "3.11.0" @@ -3569,7 +3832,7 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e67ba7e9b2b56446f1d419b1d807906278ffa1a658a8a5d8a39dcb1f5a78614f" dependencies = [ - "toml_edit", + "toml_edit 0.25.11+spec-1.1.0", ] [[package]] @@ -3581,6 +3844,37 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "profiling" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d595e54a326bc53c1c197b32d295e14b169e3cfeaa8dc82b529f947fba6bcf5" +dependencies = [ + "profiling-procmacros", +] + +[[package]] +name = "profiling-procmacros" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4488a4a36b9a4ba6b9334a32a39971f77c1436ec82c38707bce707699cc3bbcb" +dependencies = [ + "quote", + "syn 2.0.117", +] + +[[package]] +name = "pxfm" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0c5ccf5294c6ccd63a74f1565028353830a9c2f5eb0c682c355c471726a6e3f" + +[[package]] +name = "quick-error" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" + [[package]] name = "quick-xml" version = "0.31.0" @@ -3840,6 +4134,55 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "rav1e" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd87ce80a7665b1cce111f8a16c1f3929f6547ce91ade6addf4ec86a8dda5ce9" +dependencies = [ + "arbitrary", + "arg_enum_proc_macro", + "arrayvec", + "av1-grain", + "bitstream-io", + "built", + "cfg-if", + "interpolate_name", + "itertools 0.12.1", + "libc", + "libfuzzer-sys", + "log", + "maybe-rayon", + "new_debug_unreachable", + "noop_proc_macro", + "num-derive", + "num-traits", + "once_cell", + "paste", + "profiling", + "rand 0.8.6", + "rand_chacha 0.3.1", + "simd_helpers", + "system-deps", + "thiserror 1.0.69", + "v_frame", + "wasm-bindgen", +] + +[[package]] +name = "ravif" +version = "0.11.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5825c26fddd16ab9f515930d49028a630efec172e903483c94796cfe31893e6b" +dependencies = [ + "avif-serialize", + "imgref", + "loop9", + "quick-error", + "rav1e", + "rgb", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -4004,6 +4347,12 @@ dependencies = [ "tower-service", ] +[[package]] +name = "rgb" +version = "0.8.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b34b781b31e5d73e9fbc8689c70551fd1ade9a19e3e28cfec8580a79290cc4" + [[package]] name = "ring" version = "0.17.14" @@ -4356,6 +4705,15 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "serde_spanned" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -4480,6 +4838,15 @@ dependencies = [ "simdutf8", ] +[[package]] +name = "simd_helpers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95890f873bec569a0362c235787f3aca6e1e887302ba4840839bcc6459c42da6" +dependencies = [ + "quote", +] + [[package]] name = "simdutf8" version = "0.1.5" @@ -4952,12 +5319,31 @@ dependencies = [ "libc", ] +[[package]] +name = "system-deps" +version = "6.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e535eb8dded36d55ec13eddacd30dec501792ff23a0b1682c38601b8cf2349" +dependencies = [ + "cfg-expr", + "heck", + "pkg-config", + "toml", + "version-compare", +] + [[package]] name = "tagptr" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" +[[package]] +name = "target-lexicon" +version = "0.12.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" + [[package]] name = "tempfile" version = "3.27.0" @@ -4989,7 +5375,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4ea810f0692f9f51b382fff5893887bb4580f5fa246fde546e0b13e7fcee662" dependencies = [ "fnv", - "nom", + "nom 7.1.3", "phf", "phf_codegen", ] @@ -5250,6 +5636,27 @@ dependencies = [ "webpki-roots 0.26.11", ] +[[package]] +name = "toml" +version = "0.8.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime 0.6.11", + "toml_edit 0.22.27", +] + +[[package]] +name = "toml_datetime" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" +dependencies = [ + "serde", +] + [[package]] name = "toml_datetime" version = "1.1.1+spec-1.1.0" @@ -5259,6 +5666,19 @@ dependencies = [ "serde_core", ] +[[package]] +name = "toml_edit" +version = "0.22.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime 0.6.11", + "winnow 0.7.15", +] + [[package]] name = "toml_edit" version = "0.25.11+spec-1.1.0" @@ -5266,9 +5686,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b59c4d22ed448339746c59b905d24568fcbb3ab65a500494f7b8c3e97739f2b" dependencies = [ "indexmap", - "toml_datetime", + "toml_datetime 1.1.1+spec-1.1.0", "toml_parser", - "winnow", + "winnow 1.0.2", ] [[package]] @@ -5277,7 +5697,7 @@ version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2abe9b86193656635d2411dc43050282ca48aa31c2451210f4202550afb7526" dependencies = [ - "winnow", + "winnow 1.0.2", ] [[package]] @@ -5627,6 +6047,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "v_frame" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "666b7727c8875d6ab5db9533418d7c764233ac9c0cff1d469aec8fa127597be2" +dependencies = [ + "aligned-vec", + "num-traits", + "wasm-bindgen", +] + [[package]] name = "valuable" version = "0.1.1" @@ -5639,6 +6070,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version-compare" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c2856837ef78f57382f06b2b8563a2f512f7185d732608fd9176cb3b8edf0e" + [[package]] name = "version_check" version = "0.9.5" @@ -5838,6 +6275,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webp" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c071456adef4aca59bf6a583c46b90ff5eb0b4f758fc347cea81290288f37ce1" +dependencies = [ + "image", + "libwebp-sys", +] + [[package]] name = "webpki-root-certs" version = "1.0.7" @@ -6218,6 +6665,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945" +dependencies = [ + "memchr", +] + [[package]] name = "winnow" version = "1.0.2" @@ -6332,6 +6788,7 @@ dependencies = [ "domain", "dotenvy", "export", + "image-converter", "image-storage", "importer", "metadata", @@ -6410,7 +6867,7 @@ dependencies = [ "uds_windows", "uuid", "windows-sys 0.61.2", - "winnow", + "winnow 1.0.2", "zbus_macros", "zbus_names", "zvariant", @@ -6449,7 +6906,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7074f3e50b894eac91750142016d30d0a89be8e67dbfd9704fb875825760e52d" dependencies = [ "serde", - "winnow", + "winnow 1.0.2", "zvariant", ] @@ -6588,6 +7045,21 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "zune-core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb8a0807f7c01457d0379ba880ba6322660448ddebc890ce29bb64da71fb40f9" + +[[package]] +name = "zune-jpeg" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27bc9d5b815bc103f142aa054f561d9187d191692ec7c2d1e2b4737f8dbd7296" +dependencies = [ + "zune-core", +] + [[package]] name = "zvariant" version = "5.11.0" @@ -6597,7 +7069,7 @@ dependencies = [ "endi", "enumflags2", "serde", - "winnow", + "winnow 1.0.2", "zvariant_derive", "zvariant_utils", ] @@ -6625,5 +7097,5 @@ dependencies = [ "quote", "serde", "syn 2.0.117", - "winnow", + "winnow 1.0.2", ] diff --git a/Cargo.toml b/Cargo.toml index 1517ca1..8c39ed5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "crates/api-types", "crates/application", "crates/adapters/tmdb-enrichment", + "crates/adapters/image-converter", "crates/domain", "crates/presentation", "crates/tui", @@ -79,3 +80,4 @@ nats = { path = "crates/adapters/nats" } sqlite-event-queue = { path = "crates/adapters/sqlite-event-queue" } postgres-event-queue = { path = "crates/adapters/postgres-event-queue" } importer = { path = "crates/adapters/importer" } +image-converter = { path = "crates/adapters/image-converter" } diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs index 7db1e5f..4327a14 100644 --- a/crates/adapters/event-payload/src/lib.rs +++ b/crates/adapters/event-payload/src/lib.rs @@ -43,6 +43,9 @@ pub enum EventPayload { movie_id: String, external_metadata_id: String, }, + ImageStored { + key: String, + }, } impl EventPayload { @@ -55,6 +58,7 @@ impl EventPayload { EventPayload::UserUpdated { .. } => "UserUpdated", EventPayload::ReviewDeleted { .. } => "ReviewDeleted", EventPayload::MovieEnrichmentRequested { .. } => "MovieEnrichmentRequested", + EventPayload::ImageStored { .. } => "ImageStored", } } } @@ -114,6 +118,7 @@ impl From<&DomainEvent> for EventPayload { external_metadata_id: external_metadata_id.clone(), } } + DomainEvent::ImageStored { key } => EventPayload::ImageStored { key: key.clone() }, } } } @@ -171,6 +176,9 @@ impl TryFrom for DomainEvent { external_metadata_id, }) } + EventPayload::ImageStored { key } => { + Ok(DomainEvent::ImageStored { key }) + } } } } @@ -247,4 +255,20 @@ mod tests { assert_eq!(EventPayload::from(&review_updated()).event_type(), "ReviewUpdated"); assert_eq!(EventPayload::from(&movie_discovered()).event_type(), "MovieDiscovered"); } + + #[test] + fn round_trip_image_stored() { + let event = DomainEvent::ImageStored { key: "avatars/abc123".into() }; + let payload = EventPayload::from(&event); + let json = serde_json::to_string(&payload).unwrap(); + let back: EventPayload = serde_json::from_str(&json).unwrap(); + let recovered = DomainEvent::try_from(back).unwrap(); + assert_eq!(EventPayload::from(&event), EventPayload::from(&recovered)); + } + + #[test] + fn image_stored_event_type() { + let payload = EventPayload::from(&DomainEvent::ImageStored { key: "posters/x".into() }); + assert_eq!(payload.event_type(), "ImageStored"); + } } diff --git a/crates/adapters/image-converter/Cargo.toml b/crates/adapters/image-converter/Cargo.toml new file mode 100644 index 0000000..b49baa9 --- /dev/null +++ b/crates/adapters/image-converter/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "image-converter" +version = "0.1.0" +edition = "2021" + +[dependencies] +domain = { workspace = true } +async-trait = { workspace = true } +anyhow = { workspace = true } +tracing = { workspace = true } +tokio = { workspace = true } +image = { version = "0.25", default-features = false, features = ["jpeg", "png", "webp"] } +ravif = { version = "0.11", default-features = false } +webp = "0.3" + +[dev-dependencies] +image-storage = { workspace = true } +object_store = "0.11" +uuid = { workspace = true } diff --git a/crates/adapters/image-converter/src/backfill.rs b/crates/adapters/image-converter/src/backfill.rs new file mode 100644 index 0000000..ee73212 --- /dev/null +++ b/crates/adapters/image-converter/src/backfill.rs @@ -0,0 +1,141 @@ +use std::{sync::Arc, time::Duration}; + +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::DomainEvent, + ports::{EventPublisher, ImageRefPort, PeriodicJob}, +}; + +pub struct ConversionBackfillJob { + image_ref: Arc, + event_publisher: Arc, +} + +impl ConversionBackfillJob { + pub fn new( + image_ref: Arc, + event_publisher: Arc, + ) -> Self { + Self { image_ref, event_publisher } + } +} + +#[async_trait] +impl PeriodicJob for ConversionBackfillJob { + fn interval(&self) -> Duration { + Duration::from_secs(60 * 60 * 24) // 24h + } + + async fn run(&self) -> Result<(), DomainError> { + let keys = self.image_ref.list_keys().await?; + + for key in keys { + if key.ends_with(".avif") || key.ends_with(".webp") { + continue; + } + if let Err(e) = self.event_publisher + .publish(&DomainEvent::ImageStored { key: key.clone() }) + .await + { + tracing::warn!("backfill: failed to emit ImageStored for {key}: {e}"); + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Mutex; + + struct MockImageRef { + keys: Vec, + } + + #[async_trait::async_trait] + impl ImageRefPort for MockImageRef { + async fn swap(&self, _: &str, _: &str) -> Result<(), DomainError> { Ok(()) } + async fn list_keys(&self) -> Result, DomainError> { + Ok(self.keys.clone()) + } + } + + struct MockPublisher { + emitted: Mutex>, + } + + impl MockPublisher { + fn new() -> Arc { + Arc::new(Self { emitted: Mutex::new(vec![]) }) + } + + fn emitted(&self) -> Vec { + self.emitted.lock().unwrap().clone() + } + } + + #[async_trait::async_trait] + impl EventPublisher for MockPublisher { + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { + if let DomainEvent::ImageStored { key } = event { + self.emitted.lock().unwrap().push(key.clone()); + } + Ok(()) + } + } + + #[tokio::test] + async fn emits_image_stored_for_unconverted_keys() { + let image_ref = Arc::new(MockImageRef { + keys: vec!["avatars/u1".into(), "posters/m1".into()], + }); + let publisher = MockPublisher::new(); + let job = ConversionBackfillJob::new( + image_ref, + Arc::clone(&publisher) as Arc, + ); + + job.run().await.unwrap(); + + let mut emitted = publisher.emitted(); + emitted.sort(); + assert_eq!(emitted, vec!["avatars/u1", "posters/m1"]); + } + + #[tokio::test] + async fn skips_already_converted_keys() { + let image_ref = Arc::new(MockImageRef { + keys: vec![ + "avatars/u1.avif".into(), + "posters/m1".into(), + "avatars/u2.webp".into(), + ], + }); + let publisher = MockPublisher::new(); + let job = ConversionBackfillJob::new( + image_ref, + Arc::clone(&publisher) as Arc, + ); + + job.run().await.unwrap(); + + assert_eq!(publisher.emitted(), vec!["posters/m1"]); + } + + #[tokio::test] + async fn empty_keys_emits_nothing() { + let image_ref = Arc::new(MockImageRef { keys: vec![] }); + let publisher = MockPublisher::new(); + let job = ConversionBackfillJob::new( + image_ref, + Arc::clone(&publisher) as Arc, + ); + + job.run().await.unwrap(); + + assert!(publisher.emitted().is_empty()); + } +} diff --git a/crates/adapters/image-converter/src/config.rs b/crates/adapters/image-converter/src/config.rs new file mode 100644 index 0000000..d2502df --- /dev/null +++ b/crates/adapters/image-converter/src/config.rs @@ -0,0 +1,90 @@ +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum Format { + Avif, + Webp, +} + +impl Format { + pub fn extension(self) -> &'static str { + match self { + Format::Avif => ".avif", + Format::Webp => ".webp", + } + } +} + +pub struct ConversionConfig { + pub format: Format, +} + +impl ConversionConfig { + pub fn from_env() -> anyhow::Result> { + Self::from_vars( + std::env::var("IMAGE_CONVERSION_ENABLED").ok().as_deref(), + std::env::var("IMAGE_CONVERSION_FORMAT").ok().as_deref(), + ) + } + + fn from_vars(enabled: Option<&str>, format: Option<&str>) -> anyhow::Result> { + if enabled != Some("true") { + return Ok(None); + } + + let format_str = format.ok_or_else(|| { + anyhow::anyhow!("IMAGE_CONVERSION_FORMAT required when IMAGE_CONVERSION_ENABLED=true") + })?; + + let format = match format_str { + "avif" => Format::Avif, + "webp" => Format::Webp, + other => anyhow::bail!( + "Unknown IMAGE_CONVERSION_FORMAT: {other:?}. Valid values: avif, webp" + ), + }; + + Ok(Some(Self { format })) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn disabled_by_default() { + assert!(ConversionConfig::from_vars(None, None).unwrap().is_none()); + assert!(ConversionConfig::from_vars(Some("false"), None).unwrap().is_none()); + } + + #[test] + fn enabled_avif() { + let cfg = ConversionConfig::from_vars(Some("true"), Some("avif")).unwrap().unwrap(); + assert_eq!(cfg.format, Format::Avif); + } + + #[test] + fn enabled_webp() { + let cfg = ConversionConfig::from_vars(Some("true"), Some("webp")).unwrap().unwrap(); + assert_eq!(cfg.format, Format::Webp); + } + + #[test] + fn unknown_format_is_error() { + assert!(ConversionConfig::from_vars(Some("true"), Some("gif")).is_err()); + } + + #[test] + fn missing_format_when_enabled_is_error() { + assert!(ConversionConfig::from_vars(Some("true"), None).is_err()); + } + + #[test] + fn avif_extension() { + assert_eq!(Format::Avif.extension(), ".avif"); + } + + #[test] + fn webp_extension() { + assert_eq!(Format::Webp.extension(), ".webp"); + } +} diff --git a/crates/adapters/image-converter/src/handler.rs b/crates/adapters/image-converter/src/handler.rs new file mode 100644 index 0000000..ea679e0 --- /dev/null +++ b/crates/adapters/image-converter/src/handler.rs @@ -0,0 +1,224 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::DomainEvent, + ports::{EventHandler, ImageRefPort, ImageStorage}, +}; + +use crate::Format; + +pub struct ImageConversionHandler { + storage: Arc, + image_ref: Arc, + format: Format, +} + +impl ImageConversionHandler { + pub fn new( + storage: Arc, + image_ref: Arc, + format: Format, + ) -> Self { + Self { storage, image_ref, format } + } +} + +#[async_trait] +impl EventHandler for ImageConversionHandler { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + let key = match event { + DomainEvent::ImageStored { key } => key.clone(), + _ => return Ok(()), + }; + + if key.ends_with(".avif") || key.ends_with(".webp") { + return Ok(()); + } + + let bytes = self.storage.get(&key).await?; + let format = self.format; + + let converted = tokio::task::spawn_blocking(move || convert(bytes, format)) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))? + .map_err(|e| DomainError::InfrastructureError(e))?; + + let ext = format.extension(); + let new_key = format!("{key}{ext}"); + self.storage.store(&new_key, &converted).await?; + + if let Err(e) = self.image_ref.swap(&key, &new_key).await { + tracing::error!("ImageRefPort::swap failed for {key} → {new_key}: {e}"); + return Err(e); + } + + if let Err(e) = self.storage.delete(&key).await { + tracing::warn!("failed to delete old image key {key}: {e}"); + } + + tracing::info!("converted {key} → {new_key}"); + Ok(()) + } +} + +fn convert(bytes: Vec, format: Format) -> Result, String> { + let img = image::load_from_memory(&bytes).map_err(|e| e.to_string())?; + + match format { + Format::Avif => { + let rgba = img.to_rgba8(); + let width = rgba.width() as usize; + let height = rgba.height() as usize; + let pixels: Vec = rgba + .pixels() + .map(|p| ravif::RGBA8 { r: p.0[0], g: p.0[1], b: p.0[2], a: p.0[3] }) + .collect(); + let result = ravif::Encoder::new() + .with_quality(80.0) + .with_speed(6) + .encode_rgba(ravif::Img::new(&pixels, width, height)) + .map_err(|e| e.to_string())?; + Ok(result.avif_file.to_vec()) + } + Format::Webp => { + let rgba = img.to_rgba8(); + let (width, height) = (rgba.width(), rgba.height()); + let encoder = webp::Encoder::from_rgba(rgba.as_raw(), width, height); + Ok(encoder.encode(80.0).to_vec()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Mutex; + use object_store::memory::InMemory; + use image_storage::ImageStorageAdapter; + + struct MockImageRef { + swaps: Mutex>, + } + + impl MockImageRef { + fn new() -> Arc { + Arc::new(Self { swaps: Mutex::new(vec![]) }) + } + + fn swaps(&self) -> Vec<(String, String)> { + self.swaps.lock().unwrap().clone() + } + } + + #[async_trait::async_trait] + impl ImageRefPort for MockImageRef { + async fn swap(&self, old: &str, new: &str) -> Result<(), DomainError> { + self.swaps.lock().unwrap().push((old.into(), new.into())); + Ok(()) + } + async fn list_keys(&self) -> Result, DomainError> { + Ok(vec![]) + } + } + + fn in_memory_storage() -> Arc { + Arc::new(ImageStorageAdapter::new(Arc::new(InMemory::new()))) + } + + fn tiny_jpeg() -> Vec { + use image::{DynamicImage, ImageBuffer, Rgb}; + let img = DynamicImage::ImageRgb8( + ImageBuffer::from_pixel(4, 4, Rgb([200u8, 100, 50])), + ); + let mut buf = std::io::Cursor::new(Vec::new()); + img.write_to(&mut buf, image::ImageFormat::Jpeg).unwrap(); + buf.into_inner() + } + + #[tokio::test] + async fn ignores_non_image_stored_events() { + let storage = in_memory_storage(); + let image_ref = MockImageRef::new(); + let handler = ImageConversionHandler::new( + Arc::clone(&storage) as Arc, + Arc::clone(&image_ref) as Arc, + Format::Avif, + ); + + handler.handle(&DomainEvent::UserUpdated { + user_id: domain::value_objects::UserId::from_uuid(uuid::Uuid::new_v4()), + }).await.unwrap(); + + assert!(image_ref.swaps().is_empty()); + } + + #[tokio::test] + async fn skips_already_converted_avif_key() { + let storage = in_memory_storage(); + storage.store("avatars/u1.avif", &tiny_jpeg()).await.unwrap(); + let image_ref = MockImageRef::new(); + let handler = ImageConversionHandler::new( + Arc::clone(&storage) as Arc, + Arc::clone(&image_ref) as Arc, + Format::Avif, + ); + + handler.handle(&DomainEvent::ImageStored { key: "avatars/u1.avif".into() }).await.unwrap(); + + assert!(image_ref.swaps().is_empty()); + } + + #[tokio::test] + async fn skips_already_converted_webp_key() { + let storage = in_memory_storage(); + storage.store("posters/m1.webp", &tiny_jpeg()).await.unwrap(); + let image_ref = MockImageRef::new(); + let handler = ImageConversionHandler::new( + Arc::clone(&storage) as Arc, + Arc::clone(&image_ref) as Arc, + Format::Webp, + ); + + handler.handle(&DomainEvent::ImageStored { key: "posters/m1.webp".into() }).await.unwrap(); + + assert!(image_ref.swaps().is_empty()); + } + + #[tokio::test] + async fn converts_jpeg_to_avif_and_swaps_key() { + let storage = in_memory_storage(); + storage.store("avatars/u1", &tiny_jpeg()).await.unwrap(); + let image_ref = MockImageRef::new(); + let handler = ImageConversionHandler::new( + Arc::clone(&storage) as Arc, + Arc::clone(&image_ref) as Arc, + Format::Avif, + ); + + handler.handle(&DomainEvent::ImageStored { key: "avatars/u1".into() }).await.unwrap(); + + assert_eq!(image_ref.swaps(), vec![("avatars/u1".into(), "avatars/u1.avif".into())]); + assert!(storage.get("avatars/u1.avif").await.is_ok()); + assert!(storage.get("avatars/u1").await.is_err()); + } + + #[tokio::test] + async fn converts_jpeg_to_webp_and_swaps_key() { + let storage = in_memory_storage(); + storage.store("avatars/u1", &tiny_jpeg()).await.unwrap(); + let image_ref = MockImageRef::new(); + let handler = ImageConversionHandler::new( + Arc::clone(&storage) as Arc, + Arc::clone(&image_ref) as Arc, + Format::Webp, + ); + + handler.handle(&DomainEvent::ImageStored { key: "avatars/u1".into() }).await.unwrap(); + + assert_eq!(image_ref.swaps(), vec![("avatars/u1".into(), "avatars/u1.webp".into())]); + assert!(storage.get("avatars/u1.webp").await.is_ok()); + assert!(storage.get("avatars/u1").await.is_err()); + } +} diff --git a/crates/adapters/image-converter/src/lib.rs b/crates/adapters/image-converter/src/lib.rs new file mode 100644 index 0000000..16370da --- /dev/null +++ b/crates/adapters/image-converter/src/lib.rs @@ -0,0 +1,36 @@ +mod backfill; +mod config; +mod handler; + +pub use backfill::ConversionBackfillJob; +pub use config::{ConversionConfig, Format}; +pub use handler::ImageConversionHandler; + +use std::sync::Arc; +use domain::ports::{EventHandler, EventPublisher, ImageRefPort, ImageStorage, PeriodicJob}; + +pub fn build( + image_storage: Arc, + image_ref: Arc, + event_publisher: Arc, +) -> anyhow::Result, Arc)>> { + let config = match ConversionConfig::from_env()? { + Some(c) => c, + None => return Ok(None), + }; + + let format = config.format; + + let handler = Arc::new(ImageConversionHandler::new( + Arc::clone(&image_storage), + Arc::clone(&image_ref), + format, + )) as Arc; + + let job = Arc::new(ConversionBackfillJob::new( + Arc::clone(&image_ref), + Arc::clone(&event_publisher), + )) as Arc; + + Ok(Some((handler, job))) +} diff --git a/crates/adapters/nats/src/subject.rs b/crates/adapters/nats/src/subject.rs index c991607..de8b5b5 100644 --- a/crates/adapters/nats/src/subject.rs +++ b/crates/adapters/nats/src/subject.rs @@ -9,6 +9,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String { DomainEvent::MovieDeleted { .. } => "movie.deleted", DomainEvent::UserUpdated { .. } => "user.updated", DomainEvent::MovieEnrichmentRequested { .. } => "movie.enrichment.requested", + DomainEvent::ImageStored { .. } => "image.stored", }; format!("{prefix}.{suffix}") } diff --git a/crates/adapters/poster-sync/src/lib.rs b/crates/adapters/poster-sync/src/lib.rs index 38e723c..8670086 100644 --- a/crates/adapters/poster-sync/src/lib.rs +++ b/crates/adapters/poster-sync/src/lib.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use domain::{ errors::DomainError, events::DomainEvent, - ports::{EventHandler, ImageStorage, MetadataClient, MovieRepository, PosterFetcherClient}, + ports::{EventHandler, EventPublisher, ImageStorage, MetadataClient, MovieRepository, PosterFetcherClient}, value_objects::{ExternalMetadataId, MovieId, PosterPath}, }; @@ -13,6 +13,7 @@ pub struct PosterSyncHandler { metadata_client: Arc, poster_fetcher: Arc, image_storage: Arc, + event_publisher: Arc, max_retries: u32, } @@ -22,9 +23,10 @@ impl PosterSyncHandler { metadata_client: Arc, poster_fetcher: Arc, image_storage: Arc, + event_publisher: Arc, max_retries: u32, ) -> Self { - Self { movie_repository, metadata_client, poster_fetcher, image_storage, max_retries } + Self { movie_repository, metadata_client, poster_fetcher, image_storage, event_publisher, max_retries } } async fn sync(&self, movie_id: MovieId, external_metadata_id: ExternalMetadataId) -> Result<(), DomainError> { @@ -47,6 +49,12 @@ impl PosterSyncHandler { let image_bytes = self.poster_fetcher.fetch_poster_bytes(&poster_url).await?; let stored_path = self.image_storage.store(&movie_id.value().to_string(), &image_bytes).await?; + if let Err(e) = self.event_publisher + .publish(&DomainEvent::ImageStored { key: stored_path.clone() }) + .await + { + tracing::warn!("failed to emit ImageStored for {stored_path}: {e}"); + } let poster_path = PosterPath::new(stored_path)?; movie.update_poster(poster_path); diff --git a/crates/adapters/postgres/src/image_ref.rs b/crates/adapters/postgres/src/image_ref.rs new file mode 100644 index 0000000..537c559 --- /dev/null +++ b/crates/adapters/postgres/src/image_ref.rs @@ -0,0 +1,48 @@ +use async_trait::async_trait; +use domain::{errors::DomainError, ports::ImageRefPort}; +use sqlx::PgPool; +use std::sync::Arc; + +pub struct PostgresImageRefAdapter { + pool: PgPool, +} + +impl PostgresImageRefAdapter { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +pub fn create_image_ref(pool: PgPool) -> Arc { + Arc::new(PostgresImageRefAdapter::new(pool)) +} + +#[async_trait] +impl ImageRefPort for PostgresImageRefAdapter { + async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError> { + let mut tx = self.pool.begin().await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + sqlx::query("UPDATE users SET avatar_path = $1 WHERE avatar_path = $2") + .bind(new_key).bind(old_key) + .execute(&mut *tx).await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + sqlx::query("UPDATE movies SET poster_path = $1 WHERE poster_path = $2") + .bind(new_key).bind(old_key) + .execute(&mut *tx).await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + tx.commit().await + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } + + async fn list_keys(&self) -> Result, DomainError> { + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT avatar_path FROM users WHERE avatar_path IS NOT NULL + UNION + SELECT poster_path FROM movies WHERE poster_path IS NOT NULL", + ) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + Ok(rows.into_iter().map(|(k,)| k).collect()) + } +} diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index ae124ec..4fa9044 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -12,6 +12,7 @@ use domain::{ }; use sqlx::PgPool; +mod image_ref; mod import_profile; mod import_session; mod models; @@ -23,6 +24,7 @@ use models::{ UserTotalsRow, datetime_to_str, }; +pub use image_ref::{PostgresImageRefAdapter, create_image_ref}; pub use import_profile::PostgresImportProfileRepository; pub use import_session::PostgresImportSessionRepository; pub use profile::PostgresMovieProfileRepository; diff --git a/crates/adapters/sqlite/src/image_ref.rs b/crates/adapters/sqlite/src/image_ref.rs new file mode 100644 index 0000000..ce9dce1 --- /dev/null +++ b/crates/adapters/sqlite/src/image_ref.rs @@ -0,0 +1,155 @@ +use async_trait::async_trait; +use domain::{errors::DomainError, ports::ImageRefPort}; +use sqlx::SqlitePool; +use std::sync::Arc; + +pub struct SqliteImageRefAdapter { + pool: SqlitePool, +} + +impl SqliteImageRefAdapter { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +pub fn create_image_ref(pool: SqlitePool) -> Arc { + Arc::new(SqliteImageRefAdapter::new(pool)) +} + +#[async_trait] +impl ImageRefPort for SqliteImageRefAdapter { + async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError> { + let mut tx = self.pool.begin().await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + sqlx::query("UPDATE users SET avatar_path = ? WHERE avatar_path = ?") + .bind(new_key).bind(old_key) + .execute(&mut *tx).await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + sqlx::query("UPDATE movies SET poster_path = ? WHERE poster_path = ?") + .bind(new_key).bind(old_key) + .execute(&mut *tx).await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + tx.commit().await + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } + + async fn list_keys(&self) -> Result, DomainError> { + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT avatar_path FROM users WHERE avatar_path IS NOT NULL + UNION + SELECT poster_path FROM movies WHERE poster_path IS NOT NULL", + ) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + Ok(rows.into_iter().map(|(k,)| k).collect()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + async fn setup(pool: &SqlitePool) { + sqlx::query( + "CREATE TABLE IF NOT EXISTS users ( + id TEXT PRIMARY KEY, + email TEXT NOT NULL, + username TEXT NOT NULL, + password_hash TEXT NOT NULL, + created_at TEXT NOT NULL, + role TEXT NOT NULL DEFAULT 'standard', + bio TEXT, + avatar_path TEXT + )", + ) + .execute(pool) + .await + .unwrap(); + + sqlx::query( + "CREATE TABLE IF NOT EXISTS movies ( + id TEXT PRIMARY KEY, + external_metadata_id TEXT, + title TEXT NOT NULL, + release_year INTEGER, + director TEXT, + poster_path TEXT + )", + ) + .execute(pool) + .await + .unwrap(); + } + + #[tokio::test] + async fn list_keys_returns_both_avatar_and_poster_paths() { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + setup(&pool).await; + + sqlx::query("INSERT INTO users VALUES ('u1','e@e.com','u','h','2024-01-01','standard',NULL,'avatars/u1')") + .execute(&pool).await.unwrap(); + sqlx::query("INSERT INTO movies VALUES ('m1','tt1','Title',2020,'Dir','posters/m1')") + .execute(&pool).await.unwrap(); + + let adapter = SqliteImageRefAdapter::new(pool); + let mut keys = adapter.list_keys().await.unwrap(); + keys.sort(); + + assert_eq!(keys, vec!["avatars/u1", "posters/m1"]); + } + + #[tokio::test] + async fn list_keys_excludes_nulls() { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + setup(&pool).await; + + sqlx::query("INSERT INTO users VALUES ('u1','e@e.com','u','h','2024-01-01','standard',NULL,NULL)") + .execute(&pool).await.unwrap(); + + let adapter = SqliteImageRefAdapter::new(pool); + assert_eq!(adapter.list_keys().await.unwrap(), Vec::::new()); + } + + #[tokio::test] + async fn swap_updates_avatar_path() { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + setup(&pool).await; + + sqlx::query("INSERT INTO users VALUES ('u1','e@e.com','u','h','2024-01-01','standard',NULL,'avatars/u1')") + .execute(&pool).await.unwrap(); + + let adapter = SqliteImageRefAdapter::new(pool.clone()); + adapter.swap("avatars/u1", "avatars/u1.avif").await.unwrap(); + + let row: (Option,) = sqlx::query_as("SELECT avatar_path FROM users WHERE id='u1'") + .fetch_one(&pool).await.unwrap(); + assert_eq!(row.0.as_deref(), Some("avatars/u1.avif")); + } + + #[tokio::test] + async fn swap_updates_poster_path() { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + setup(&pool).await; + + sqlx::query("INSERT INTO movies VALUES ('m1','tt1','Title',2020,'Dir','posters/m1')") + .execute(&pool).await.unwrap(); + + let adapter = SqliteImageRefAdapter::new(pool.clone()); + adapter.swap("posters/m1", "posters/m1.avif").await.unwrap(); + + let row: (Option,) = sqlx::query_as("SELECT poster_path FROM movies WHERE id='m1'") + .fetch_one(&pool).await.unwrap(); + assert_eq!(row.0.as_deref(), Some("posters/m1.avif")); + } + + #[tokio::test] + async fn swap_noop_when_key_not_found() { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + setup(&pool).await; + + let adapter = SqliteImageRefAdapter::new(pool); + adapter.swap("missing/key", "missing/key.avif").await.unwrap(); + } +} diff --git a/crates/adapters/sqlite/src/lib.rs b/crates/adapters/sqlite/src/lib.rs index 2c67771..ccd2aa5 100644 --- a/crates/adapters/sqlite/src/lib.rs +++ b/crates/adapters/sqlite/src/lib.rs @@ -12,6 +12,7 @@ use domain::{ }; use sqlx::SqlitePool; +mod image_ref; mod import_profile; mod import_session; mod migrations; @@ -24,6 +25,7 @@ use models::{ UserTotalsRow, datetime_to_str, }; +pub use image_ref::{SqliteImageRefAdapter, create_image_ref}; pub use import_profile::SqliteImportProfileRepository; pub use import_session::SqliteImportSessionRepository; pub use profile::SqliteMovieProfileRepository; diff --git a/crates/application/src/use_cases/sync_poster.rs b/crates/application/src/use_cases/sync_poster.rs index 5fd8d9c..1fc11e0 100644 --- a/crates/application/src/use_cases/sync_poster.rs +++ b/crates/application/src/use_cases/sync_poster.rs @@ -1,5 +1,6 @@ use domain::{ errors::DomainError, + events::DomainEvent, value_objects::{ExternalMetadataId, MovieId, PosterPath}, }; @@ -39,6 +40,14 @@ pub async fn execute(ctx: &AppContext, cmd: SyncPosterCommand) -> Result<(), Dom .image_storage .store(&movie_id.value().to_string(), &image_bytes) .await?; + + if let Err(e) = ctx.event_publisher + .publish(&DomainEvent::ImageStored { key: stored_path.clone() }) + .await + { + tracing::warn!("failed to emit ImageStored for {stored_path}: {e}"); + } + let poster_path = PosterPath::new(stored_path)?; movie.update_poster(poster_path); diff --git a/crates/application/src/use_cases/update_profile.rs b/crates/application/src/use_cases/update_profile.rs index 7cb1762..b4aafd6 100644 --- a/crates/application/src/use_cases/update_profile.rs +++ b/crates/application/src/use_cases/update_profile.rs @@ -27,6 +27,14 @@ pub async fn execute(ctx: &AppContext, cmd: UpdateProfileCommand) -> Result<(), } let key = format!("avatars/{}", user_id.value()); let stored = ctx.image_storage.store(&key, &bytes).await?; + + if let Err(e) = ctx.event_publisher + .publish(&DomainEvent::ImageStored { key: stored.clone() }) + .await + { + tracing::warn!("failed to emit ImageStored for {stored}: {e}"); + } + Some(stored) } else { user.avatar_path().map(|s| s.to_string()) diff --git a/crates/application/src/worker.rs b/crates/application/src/worker.rs index 6ff74ea..d4d21fd 100644 --- a/crates/application/src/worker.rs +++ b/crates/application/src/worker.rs @@ -97,6 +97,7 @@ mod tests { DomainEvent::MovieDeleted { .. } => "movie_deleted", DomainEvent::UserUpdated { .. } => "user_updated", DomainEvent::MovieEnrichmentRequested { .. } => "movie_enrichment_requested", + DomainEvent::ImageStored { .. } => "image_stored", }; self.calls.lock().unwrap().push(label); Ok(()) diff --git a/crates/domain/src/events.rs b/crates/domain/src/events.rs index 5406c30..b1beb28 100644 --- a/crates/domain/src/events.rs +++ b/crates/domain/src/events.rs @@ -41,6 +41,9 @@ pub enum DomainEvent { movie_id: MovieId, external_metadata_id: String, }, + ImageStored { + key: String, + }, } #[async_trait] diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index 4cca8ce..3ce9a38 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -264,3 +264,9 @@ pub trait ImportProfileRepository: Send + Sync { async fn get(&self, id: &ImportProfileId, user_id: &UserId) -> Result, DomainError>; async fn delete(&self, id: &ImportProfileId) -> Result<(), DomainError>; } + +#[async_trait] +pub trait ImageRefPort: Send + Sync { + async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError>; + async fn list_keys(&self) -> Result, DomainError>; +} diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index a9c8f5f..b5f1bc2 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -28,6 +28,7 @@ poster-sync = { workspace = true } export = { workspace = true } tmdb-enrichment = { workspace = true } importer = { workspace = true } +image-converter = { workspace = true } nats = { workspace = true, optional = true } sqlx = { workspace = true } diff --git a/crates/worker/src/db.rs b/crates/worker/src/db.rs index 3b83a1c..01be48a 100644 --- a/crates/worker/src/db.rs +++ b/crates/worker/src/db.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use anyhow::Context; use domain::ports::{ - DiaryRepository, ImportProfileRepository, ImportSessionRepository, MovieProfileRepository, - MovieRepository, ReviewRepository, StatsRepository, UserRepository, + DiaryRepository, ImageRefPort, ImportProfileRepository, ImportSessionRepository, + MovieProfileRepository, MovieRepository, ReviewRepository, StatsRepository, UserRepository, }; pub enum DbPool { @@ -22,6 +22,7 @@ pub struct Repos { pub import_session: Arc, pub import_profile: Arc, pub movie_profile: Arc, + pub image_ref: Arc, } pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos, DbPool)> { @@ -30,13 +31,15 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos "postgres" => { let (pool, m, r, d, s, u, is, ip, mp) = postgres::wire(database_url).await.context("PostgreSQL connection failed")?; - Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp }, DbPool::Postgres(pool))) + let image_ref = postgres::create_image_ref(pool.clone()); + Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp, image_ref }, DbPool::Postgres(pool))) } #[cfg(feature = "sqlite")] _ => { let (pool, m, r, d, s, u, is, ip, mp) = sqlite::wire(database_url).await.context("SQLite connection failed")?; - Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp }, DbPool::Sqlite(pool))) + let image_ref = sqlite::create_image_ref(pool.clone()); + Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp, image_ref }, DbPool::Sqlite(pool))) } #[cfg(not(feature = "sqlite"))] _ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"), diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 7924e6c..5b0a32f 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -31,6 +31,9 @@ async fn main() -> anyhow::Result<()> { let (repos, db_pool) = db::connect(&database_url, &backend).await?; let (event_publisher_arc, consumer_arc) = event_bus::create(&db_pool).await?; + // Save image_ref before ctx consumes repos. + let image_ref = Arc::clone(&repos.image_ref); + // Clone refs federation handler needs before ctx consumes them. #[cfg(feature = "federation")] let (fed_movie_repo, fed_review_repo, fed_diary_repo, fed_user_repo, base_url, allow_registration) = ( @@ -84,12 +87,21 @@ async fn main() -> anyhow::Result<()> { } }; + // ── Image conversion ────────────────────────────────────────────────────── + + let conversion = image_converter::build( + Arc::clone(&ctx.image_storage), + image_ref, + Arc::clone(&ctx.event_publisher), + )?; + // ── Periodic jobs ───────────────────────────────────────────────────────── let mut periodic_jobs: Vec> = vec![ Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.clone())), ]; if let Some(job) = enrichment_job { periodic_jobs.push(job); } + if let Some((_, ref conv_job)) = conversion { periodic_jobs.push(Arc::clone(conv_job)); } for job in periodic_jobs { tokio::spawn(async move { @@ -111,6 +123,7 @@ async fn main() -> anyhow::Result<()> { Arc::clone(&ctx.metadata_client), Arc::clone(&ctx.poster_fetcher), Arc::clone(&ctx.image_storage), + Arc::clone(&ctx.event_publisher), 3, )) as Arc; @@ -122,6 +135,7 @@ async fn main() -> anyhow::Result<()> { { let mut h = vec![poster, cleanup]; if let Some(e) = enrichment_handler { h.push(e); } + if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); } h } @@ -148,6 +162,7 @@ async fn main() -> anyhow::Result<()> { tracing::info!("federation event handler registered"); let mut h = vec![poster, cleanup, ap]; if let Some(e) = enrichment_handler { h.push(e); } + if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); } h } };