From ecb61f9b8f733698edd44d5d725a7a6071d30788 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 29 May 2026 03:47:06 +0200 Subject: [PATCH] feat: add federation processed activities table and update dependencies - Created a new SQL migration to add the `federation_processed_activities` table with an index on `processed_at`. - Updated dependencies in `Cargo.toml` files across `bootstrap` and `worker` crates, including version updates for `k-ap`. - Enhanced the event publishing mechanism in the `factory.rs` file to include a new `KapPublisher` for handling federation events. - Refactored the `build` function in `factory.rs` to accommodate the new event publisher and improve ActivityPub service initialization. - Modified the worker's main loop to handle new federation event types and improved error handling for event processing. Co-authored-by: Copilot --- .cargo/config.toml | 9 + .gitignore | 1 + Cargo.lock | 278 +++++--- Dockerfile | 1 + crates/adapters/activitypub/Cargo.toml | 29 +- crates/adapters/activitypub/src/handler.rs | 63 +- crates/adapters/activitypub/src/service.rs | 29 +- crates/adapters/event-payload/src/lib.rs | 17 + .../adapters/postgres-federation/Cargo.toml | 16 +- .../adapters/postgres-federation/src/lib.rs | 630 +++++++++++++----- .../014_federation_processed_activities.sql | 7 + crates/bootstrap/Cargo.toml | 47 +- crates/bootstrap/src/factory.rs | 96 ++- crates/worker/Cargo.toml | 34 +- crates/worker/src/factory.rs | 48 +- crates/worker/src/main.rs | 172 +++-- 16 files changed, 1016 insertions(+), 461 deletions(-) create mode 100644 .cargo/config.toml create mode 100644 crates/adapters/postgres/migrations/014_federation_processed_activities.sql diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..d292742 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,9 @@ +[registry] +default = "gitea" + +[registries.gitea] +index = "sparse+https://git.gabrielkaszewski.dev/api/packages/GKaszewski/cargo/" # Sparse index +# index = "https://git.gabrielkaszewski.dev/GKaszewski/_cargo-index.git" # Git + +[net] +git-fetch-with-cli = true \ No newline at end of file diff --git a/.gitignore b/.gitignore index ef97b88..53365f0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .env +/.superpowers/ /target /docs/superpowers/ diff --git a/Cargo.lock b/Cargo.lock index a1c7703..873cd73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6,7 +6,6 @@ version = 4 name = "activitypub" version = "0.1.0" dependencies = [ - "activitypub_federation", "anyhow", "async-trait", "axum", @@ -14,7 +13,7 @@ dependencies = [ "domain", "futures", "k-ap", - "reqwest 0.13.3", + "reqwest 0.13.4", "serde", "serde_json", "tokio", @@ -43,7 +42,7 @@ dependencies = [ "futures", "futures-core", "http 0.2.12", - "http 1.4.0", + "http 1.4.1", "http-signature-normalization", "http-signature-normalization-reqwest", "httpdate", @@ -52,7 +51,7 @@ dependencies = [ "pin-project-lite", "rand 0.8.6", "regex", - "reqwest 0.13.3", + "reqwest 0.13.4", "reqwest-middleware", "rsa", "serde", @@ -217,7 +216,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "smallvec", - "socket2 0.6.3", + "socket2 0.6.4", "time", "tracing", "url", @@ -426,9 +425,9 @@ dependencies = [ [[package]] name = "autocfg" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +checksum = "f2032f911046de80f0a198e0901378627c33f59ea0ac00e363d481118bd70a53" [[package]] name = "aws-lc-rs" @@ -463,7 +462,7 @@ dependencies = [ "bytes", "form_urlencoded", "futures-util", - "http 1.4.0", + "http 1.4.1", "http-body", "http-body-util", "hyper", @@ -495,7 +494,7 @@ checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" dependencies = [ "bytes", "futures-core", - "http 1.4.0", + "http 1.4.1", "http-body", "http-body-util", "mime", @@ -584,6 +583,7 @@ name = "bootstrap" version = "0.1.0" dependencies = [ "activitypub", + "anyhow", "application", "async-nats", "async-trait", @@ -591,14 +591,16 @@ dependencies = [ "axum", "domain", "dotenvy", + "event-payload", "event-transport", - "http 1.4.0", + "http 1.4.1", "k-ap", "nats", "postgres", "postgres-federation", "postgres-search", "presentation", + "serde_json", "sqlx", "storage", "tokio", @@ -610,9 +612,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.20.2" +version = "3.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" +checksum = "72f5acc6cb2ba439de613abc23857ec3d78374d8ed5ac84e9d11336e87da8649" [[package]] name = "byteorder" @@ -914,9 +916,9 @@ dependencies = [ [[package]] name = "dashmap" -version = "6.1.0" +version = "6.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +checksum = "e6361d5c062261c78a176addb82d4c821ae42bed6089de0e12603cd25de2059c" dependencies = [ "cfg-if", "crossbeam-utils", @@ -1032,9 +1034,9 @@ dependencies = [ [[package]] name = "displaydoc" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +checksum = "1ac70aa55017e108007fbaf5aa0f54b021c98f92ff8af59d42eda9da96e3dd4f" dependencies = [ "proc-macro2", "quote", @@ -1101,9 +1103,9 @@ dependencies = [ [[package]] name = "either" -version = "1.15.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +checksum = "91622ff5e7162018101f2fea40d6ebf4a78bbe5a49736a2020649edf9693679e" dependencies = [ "serde", ] @@ -1372,9 +1374,9 @@ checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" [[package]] name = "futures-timer" -version = "3.0.3" +version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" +checksum = "af43fadb8a98512d547e37b4e92e0ced13e205c061b87b4623eff01d918d6968" [[package]] name = "futures-util" @@ -1478,7 +1480,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http 1.4.0", + "http 1.4.1", "indexmap", "slab", "tokio", @@ -1581,9 +1583,9 @@ dependencies = [ [[package]] name = "http" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +checksum = "8be7462df143984c4598a256ef469b251d7d7f9e271135073e78fc535414f3d0" dependencies = [ "bytes", "itoa", @@ -1596,7 +1598,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http 1.4.0", + "http 1.4.1", ] [[package]] @@ -1607,7 +1609,7 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http 1.4.0", + "http 1.4.1", "http-body", "pin-project-lite", ] @@ -1631,7 +1633,7 @@ dependencies = [ "base64", "http-signature-normalization", "httpdate", - "reqwest 0.13.3", + "reqwest 0.13.4", "reqwest-middleware", "sha2", "tokio", @@ -1657,16 +1659,16 @@ checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" [[package]] name = "hyper" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" +checksum = "eb92f162bf56536459fc83c79b974bb12837acfed43d6bc370a7916d0ae15ecc" dependencies = [ "atomic-waker", "bytes", "futures-channel", "futures-core", "h2", - "http 1.4.0", + "http 1.4.1", "http-body", "httparse", "httpdate", @@ -1683,7 +1685,7 @@ version = "0.27.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ca68d021ef39cf6463ab54c1d0f5daf03377b70561305bb89a8f83aab66e0f" dependencies = [ - "http 1.4.0", + "http 1.4.1", "hyper", "hyper-util", "rustls", @@ -1716,14 +1718,14 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http 1.4.0", + "http 1.4.1", "http-body", "hyper", "ipnet", "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.3", + "socket2 0.6.4", "system-configuration", "tokio", "tower-service", @@ -1988,9 +1990,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.98" +version = "0.3.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67df7112613f8bfd9150013a0314e196f4800d3201ae742489d999db2f979f08" +checksum = "142bc4740e452c1e57ade0cbc129f139c9093e354346f0872ef985f4f5cf5f11" dependencies = [ "cfg-if", "futures-util", @@ -2015,8 +2017,9 @@ dependencies = [ [[package]] name = "k-ap" -version = "0.1.10" -source = "git+https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git?tag=v0.1.10#d80cfd0431205498161db8665fd884710866ca95" +version = "0.3.0" +source = "sparse+https://git.gabrielkaszewski.dev/api/packages/GKaszewski/cargo/" +checksum = "ce46ce331b0c85e5d9e1874056266d78d6a478f35ae188586fe488d840596346" dependencies = [ "activitypub_federation", "anyhow", @@ -2025,13 +2028,14 @@ dependencies = [ "chrono", "enum_delegate", "futures", - "reqwest 0.13.3", + "reqwest 0.13.4", "serde", "serde_json", "tokio", "tracing", "url", "uuid", + "zeroize", ] [[package]] @@ -2069,14 +2073,14 @@ checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" [[package]] name = "libredox" -version = "0.1.16" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e02f3bb43d335493c96bf3fd3a321600bf6bd07ed34bc64118e9293bdffea46c" +checksum = "f02ab6bace2054fb888a3c16f990117b579d14a3088e472d63c6011fa185c9d3" dependencies = [ "bitflags", "libc", "plain", - "redox_syscall 0.7.5", + "redox_syscall 0.8.0", ] [[package]] @@ -2112,9 +2116,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.29" +version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +checksum = "616ec5685824bcc94416c6d4a7a446eea774a31efd7062c8480ba6fd06d7a6e5" [[package]] name = "lru-slab" @@ -2149,9 +2153,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.8.0" +version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +checksum = "6b947ae49db0d222b1dbc6b113ce7248a3fc3a6ca21b696717bfc000ba4484d8" [[package]] name = "mime" @@ -2181,9 +2185,9 @@ dependencies = [ [[package]] name = "mio" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" +checksum = "02bd0af71c67b473010cbbc60715ee815645a4dc942899111f494b4b737d6fda" dependencies = [ "libc", "log", @@ -2220,7 +2224,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-util", - "http 1.4.0", + "http 1.4.1", "httparse", "memchr", "mime", @@ -2318,9 +2322,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" +checksum = "521739c6d2bac4aa25192232afe6841231376b2b26d4d9fae5ecf8ca5772e441" [[package]] name = "num-integer" @@ -2678,7 +2682,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.6.3", + "socket2 0.6.4", "thiserror 2.0.18", "tokio", "tracing", @@ -2716,9 +2720,9 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.6.3", + "socket2 0.6.4", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.60.2", ] [[package]] @@ -2838,9 +2842,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.7.5" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4666a1a60d8412eab19d94f6d13dcc9cea0a5ef4fdf6a5db306537413c661b1b" +checksum = "7c7591fa2c6b601dfcfe5f043f65a1c39fcdf50efefcd7f1572e538c1f4b398d" dependencies = [ "bitflags", ] @@ -2891,7 +2895,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http 1.4.0", + "http 1.4.1", "http-body", "http-body-util", "hyper", @@ -2924,9 +2928,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62e0021ea2c22aed41653bc7e1419abb2c97e038ff2c33d0e1309e49a97deec0" +checksum = "219c5811de6525e5416c7d5d53bb656d3afdbc6c5af816e0802bcfa42dbdc1c3" dependencies = [ "base64", "bytes", @@ -2934,7 +2938,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http 1.4.0", + "http 1.4.1", "http-body", "http-body-util", "hyper", @@ -2967,14 +2971,14 @@ dependencies = [ [[package]] name = "reqwest-middleware" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "199dda04a536b532d0cc04d7979e39b1c763ea749bf91507017069c00b96056f" +checksum = "07bc3f1384cffa4f274dad2d4ddd73aed32fed8f786d96c6be8aa4e5fd3c3b58" dependencies = [ "anyhow", "async-trait", - "http 1.4.0", - "reqwest 0.13.3", + "http 1.4.1", + "reqwest 0.13.4", "thiserror 2.0.18", "tower-service", ] @@ -3235,9 +3239,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.149" +version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9" dependencies = [ "indexmap", "itoa", @@ -3447,9 +3451,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +checksum = "52d1cfed4120b4d927bf7c0f86d2087a4a7d6027c906d9f9d525a80573b9be51" dependencies = [ "libc", "windows-sys 0.61.2", @@ -3909,7 +3913,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.6.3", + "socket2 0.6.4", "tokio-macros", "windows-sys 0.61.2", ] @@ -3969,7 +3973,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", - "http 1.4.0", + "http 1.4.1", "httparse", "rand 0.8.6", "ring", @@ -3991,7 +3995,7 @@ dependencies = [ "base64", "bytes", "h2", - "http 1.4.0", + "http 1.4.1", "http-body", "http-body-util", "hyper", @@ -3999,7 +4003,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "socket2 0.6.3", + "socket2 0.6.4", "sync_wrapper", "tokio", "tokio-stream", @@ -4030,14 +4034,14 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.10" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68d6fdd9f81c2819c9a8b0e0cd91660e7746a8e6ea2ba7c6b2b057985f6bcb51" +checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" dependencies = [ "bitflags", "bytes", "futures-util", - "http 1.4.0", + "http 1.4.1", "http-body", "pin-project-lite", "tower", @@ -4068,7 +4072,7 @@ dependencies = [ "axum", "forwarded-header-value", "governor", - "http 1.4.0", + "http 1.4.1", "pin-project", "thiserror 2.0.18", "tonic", @@ -4374,9 +4378,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" [[package]] name = "wasm-bindgen" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49ace1d07c165b0864824eee619580c4689389afa9dc9ed3a4c75040d82e6790" +checksum = "3ed04576f974d2b2fba0f38c51dbc5518011e38c36bf1143164be765528fd409" dependencies = [ "cfg-if", "once_cell", @@ -4387,9 +4391,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.71" +version = "0.4.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96492d0d3ffba25305a7dc88720d250b1401d7edca02cc3bcd50633b424673b8" +checksum = "9473dbd2991ae90b6291c3c32c30c6187ac49aa32f9905d1cce280ec1e110b0f" dependencies = [ "js-sys", "wasm-bindgen", @@ -4397,9 +4401,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e68e6f4afd367a562002c05637acb8578ff2dea1943df76afb9e83d177c8578" +checksum = "916151b09da36bd82f6615cbf3a419e2f0ba23a03c6160e8e92eb6bd4aa1dec6" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4407,9 +4411,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d95a9ec35c64b2a7cb35d3fead40c4238d0940c86d107136999567a4703259f2" +checksum = "299047362ccbfce148b67ab7e73349f77748e00c8296f9542adfad2ad82c5c5e" dependencies = [ "bumpalo", "proc-macro2", @@ -4420,9 +4424,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4e0100b01e9f0d03189a92b96772a1fb998639d981193d7dbab487302513441" +checksum = "9a929b2c61f11ba3e9bc35b50c1f25cb38e0e892c0c231ae2b8cf78d5dad4437" dependencies = [ "unicode-ident", ] @@ -4489,9 +4493,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.98" +version = "0.3.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b572dff8bcf38bad0fa19729c89bb5748b2b9b1d8be70cf90df697e3a8f32aa" +checksum = "6d621441cfc37b84979402712047321980c178f299193a3589d05b99e8763436" dependencies = [ "js-sys", "wasm-bindgen", @@ -4566,7 +4570,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] @@ -4663,6 +4667,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.5", +] + [[package]] name = "windows-sys" version = "0.61.2" @@ -4696,13 +4709,30 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm", + "windows_i686_gnullvm 0.52.6", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -4715,6 +4745,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -4727,6 +4763,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -4739,12 +4781,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -4757,6 +4811,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -4769,6 +4829,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -4781,6 +4847,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -4793,6 +4865,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "wit-bindgen" version = "0.51.0" @@ -4908,6 +4986,8 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "url", + "uuid", ] [[package]] @@ -4941,18 +5021,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.48" +version = "0.8.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +checksum = "bce33a6288fa3f072a8c2c7d0f2fdbb90e28298f0135c1f99b96c3db2efcc60b" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.48" +version = "0.8.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +checksum = "8fd425244944f4ab65ccff928e7323354c5a018c75838362fdce749dfad2ee1e" dependencies = [ "proc-macro2", "quote", @@ -4985,6 +5065,20 @@ name = "zeroize" version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] [[package]] name = "zerotrie" diff --git a/Dockerfile b/Dockerfile index 464ad86..f53033e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,6 +4,7 @@ FROM rust:slim-bookworm AS builder WORKDIR /build # Cache dependency compilation separately from source +COPY .cargo/ .cargo/ COPY Cargo.toml Cargo.lock ./ COPY crates/adapters/activitypub/Cargo.toml crates/adapters/activitypub/Cargo.toml COPY crates/adapters/auth/Cargo.toml crates/adapters/auth/Cargo.toml diff --git a/crates/adapters/activitypub/Cargo.toml b/crates/adapters/activitypub/Cargo.toml index aafdbd6..45264a2 100644 --- a/crates/adapters/activitypub/Cargo.toml +++ b/crates/adapters/activitypub/Cargo.toml @@ -4,18 +4,17 @@ version = "0.1.0" edition = "2021" [dependencies] -k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" } -domain = { workspace = true } -url = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -anyhow = { workspace = true } -chrono = { workspace = true } -uuid = { workspace = true } -async-trait = { workspace = true } -tracing = { workspace = true } -activitypub_federation = "0.7.0-beta.11" -reqwest = { workspace = true } -futures = { workspace = true } -tokio = { workspace = true } -axum = { workspace = true } +k-ap = { version = "0.3.0", registry = "gitea" } +domain = { workspace = true } +url = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +anyhow = { workspace = true } +chrono = { workspace = true } +uuid = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } +reqwest = { workspace = true } +futures = { workspace = true } +tokio = { workspace = true } +axum = { workspace = true } diff --git a/crates/adapters/activitypub/src/handler.rs b/crates/adapters/activitypub/src/handler.rs index 64e0081..6e4098a 100644 --- a/crates/adapters/activitypub/src/handler.rs +++ b/crates/adapters/activitypub/src/handler.rs @@ -12,7 +12,7 @@ use crate::port::{AcceptNoteInput, ActivityPubRepository}; use crate::urls::ThoughtsUrls; use domain::ports::{EventPublisher, TagRepository}; use domain::value_objects::UserId; -use k_ap::ApObjectHandler; +use k_ap::{ApContentReader, ApObjectHandler}; pub struct ThoughtsObjectHandler { repo: Arc, @@ -37,43 +37,10 @@ impl ThoughtsObjectHandler { } } -#[async_trait] -impl ApObjectHandler for ThoughtsObjectHandler { - async fn get_local_objects_for_user( - &self, - user_id: uuid::Uuid, - ) -> Result> { - let uid = UserId::from_uuid(user_id); - let entries = self - .repo - .outbox_entries_for_actor(&uid) - .await - .map_err(|e| anyhow!("{e}"))?; - entries - .into_iter() - .map(|e| { - let note_url = self.urls.thought_url(e.thought.id.as_uuid()); - let actor_url = self.urls.user_url(&user_id.to_string()); - let followers = self.urls.user_followers(&user_id.to_string()); - let in_reply_to = e - .thought - .in_reply_to_id - .map(|id| self.urls.thought_url(id.as_uuid())); - let note = ThoughtNote::new_public(ThoughtNoteInput { - id: note_url.clone(), - actor_url, - content: e.thought.content.as_str().to_owned(), - published: e.thought.created_at, - in_reply_to, - sensitive: e.thought.sensitive, - summary: e.thought.content_warning, - followers_url: followers, - }); - Ok((note_url, serde_json::to_value(¬e)?)) - }) - .collect() - } +// ── ApContentReader ─────────────────────────────────────────────────────────── +#[async_trait] +impl ApContentReader for ThoughtsObjectHandler { async fn get_local_objects_page( &self, user_id: uuid::Uuid, @@ -112,6 +79,18 @@ impl ApObjectHandler for ThoughtsObjectHandler { .collect() } + async fn count_local_posts(&self) -> Result { + self.repo + .count_local_notes() + .await + .map_err(|e| anyhow!("{e}")) + } +} + +// ── ApObjectHandler ─────────────────────────────────────────────────────────── + +#[async_trait] +impl ApObjectHandler for ThoughtsObjectHandler { async fn on_create( &self, ap_id: &Url, @@ -128,7 +107,6 @@ impl ApObjectHandler for ThoughtsObjectHandler { .await .map_err(|e| anyhow!("{e}"))?; - // Derive visibility from AP addressing conventions. let as_public = "https://www.w3.org/ns/activitystreams#Public"; let in_to = note.to.iter().any(|s| s == as_public); let in_cc = note.cc.iter().any(|s| s == as_public); @@ -161,7 +139,6 @@ impl ApObjectHandler for ThoughtsObjectHandler { .await .map_err(|e| anyhow!("{e}"))?; - // Extract and index hashtags from the AP tag array. let hashtag_names: Vec = note .tag .iter() @@ -177,7 +154,6 @@ impl ApObjectHandler for ThoughtsObjectHandler { } } - // Fire mention notifications for local @mentions in the note's tag array. let base_url = url::Url::parse(&self.urls.base_url) .ok() .and_then(|u| u.host_str().map(|h| h.to_string())) @@ -408,10 +384,7 @@ impl ApObjectHandler for ThoughtsObjectHandler { Ok(()) } - async fn count_local_posts(&self) -> Result { - self.repo - .count_local_notes() - .await - .map_err(|e| anyhow!("{e}")) + async fn on_announce_of_remote(&self, _object_url: &Url, _actor_url: &Url) -> Result<()> { + Ok(()) } } diff --git a/crates/adapters/activitypub/src/service.rs b/crates/adapters/activitypub/src/service.rs index 335429c..dfd8a51 100644 --- a/crates/adapters/activitypub/src/service.rs +++ b/crates/adapters/activitypub/src/service.rs @@ -97,6 +97,17 @@ fn build_note_json( note } +fn thought_to_ap_visibility( + v: &domain::models::thought::Visibility, +) -> k_ap::ApVisibility { + match v { + domain::models::thought::Visibility::Public => k_ap::ApVisibility::Public, + domain::models::thought::Visibility::Unlisted => k_ap::ApVisibility::Public, + domain::models::thought::Visibility::Followers => k_ap::ApVisibility::FollowersOnly, + domain::models::thought::Visibility::Direct => k_ap::ApVisibility::Private, + } +} + fn k_ap_actor_to_domain(a: k_ap::RemoteActor) -> DomainRemoteActor { DomainRemoteActor { url: a.url, @@ -264,7 +275,12 @@ impl crate::port::OutboundFederationPort for ApFederationAdapter { in_reply_to_url, ); self.inner - .broadcast_create_note(user_uuid, note) + .broadcast_create_note( + user_uuid, + note, + thought_to_ap_visibility(&thought.visibility), + vec![], + ) .await .map_err(|e| DomainError::Internal(e.to_string())) } @@ -300,7 +316,12 @@ impl crate::port::OutboundFederationPort for ApFederationAdapter { in_reply_to_url, ); self.inner - .broadcast_update_note(user_uuid, note) + .broadcast_update_note( + user_uuid, + note, + thought_to_ap_visibility(&thought.visibility), + vec![], + ) .await .map_err(|e| DomainError::Internal(e.to_string())) } @@ -384,7 +405,7 @@ impl FederationSchedulerPort for ApFederationAdapter { let actor = actor_ap_url.to_string(); let outbox = outbox_url.to_string(); tokio::spawn(async move { - if let Err(e) = service.backfill_outbox(&outbox, &actor).await { + if let Err(e) = service.import_remote_outbox(&outbox, &actor).await { tracing::warn!(actor = %actor, error = %e, "posts backfill failed"); } }); @@ -517,7 +538,7 @@ impl FederationLookupPort for ApFederationAdapter { last_fetched_at: chrono::Utc::now(), bio: actor.bio, banner_url: actor.banner_url.as_ref().map(|u| u.to_string()), - also_known_as: actor.also_known_as, + also_known_as: actor.also_known_as.into_iter().next(), followers_url: actor.followers_url.as_ref().map(|u| u.to_string()), following_url: actor.following_url.as_ref().map(|u| u.to_string()), attachment: actor diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs index c5df516..c719659 100644 --- a/crates/adapters/event-payload/src/lib.rs +++ b/crates/adapters/event-payload/src/lib.rs @@ -88,6 +88,15 @@ pub enum EventPayload { mentioned_user_id: String, author_user_id: String, }, + FederationDeliveryRequested { + inbox: String, + activity: serde_json::Value, + signing_actor_id: String, + }, + FederationBackfillRequested { + owner_user_id: String, + follower_inbox_url: String, + }, } impl EventPayload { @@ -113,6 +122,8 @@ impl EventPayload { Self::RemoteFollowRejected { .. } => "federation.follow.rejected", Self::ActorMoved { .. } => "federation.actor.moved", Self::MentionReceived { .. } => "mentions.received", + Self::FederationDeliveryRequested { .. } => "federation.delivery.requested", + Self::FederationBackfillRequested { .. } => "federation.backfill.requested", } } } @@ -409,6 +420,12 @@ impl TryFrom for DomainEvent { )?), author_user_id: UserId::from_uuid(parse_uuid(&author_user_id, "author_user_id")?), }, + EventPayload::FederationDeliveryRequested { .. } + | EventPayload::FederationBackfillRequested { .. } => { + return Err(DomainError::Internal( + "federation infrastructure event — not a domain event".into(), + )); + } }) } } diff --git a/crates/adapters/postgres-federation/Cargo.toml b/crates/adapters/postgres-federation/Cargo.toml index 4045a21..382f52a 100644 --- a/crates/adapters/postgres-federation/Cargo.toml +++ b/crates/adapters/postgres-federation/Cargo.toml @@ -4,15 +4,15 @@ version = "0.1.0" edition = "2021" [dependencies] -k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" } -sqlx = { workspace = true } -uuid = { workspace = true } -chrono = { workspace = true } -tracing = { workspace = true } +k-ap = { version = "0.3.0", registry = "gitea" } +sqlx = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +tracing = { workspace = true } async-trait = { workspace = true } -anyhow = { workspace = true } -url = { workspace = true } +anyhow = { workspace = true } +url = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["full"] } -sqlx = { workspace = true, features = ["migrate"] } +sqlx = { workspace = true, features = ["migrate"] } diff --git a/crates/adapters/postgres-federation/src/lib.rs b/crates/adapters/postgres-federation/src/lib.rs index 9cee64c..858b5f4 100644 --- a/crates/adapters/postgres-federation/src/lib.rs +++ b/crates/adapters/postgres-federation/src/lib.rs @@ -4,8 +4,8 @@ use chrono::{DateTime, Utc}; use sqlx::PgPool; use k_ap::{ - ApUser, ApUserRepository, BlockedDomain, FederationRepository, Follower, FollowerStatus, - FollowingStatus, RemoteActor, + ActivityRepository, ActorRepository, ApActorType, ApUser, ApUserRepository, BlockedDomain, + BlocklistRepository, Follower, FollowerStatus, FollowingStatus, FollowRepository, RemoteActor, }; // ── PostgresFederationRepository ───────────────────────────────────────────── @@ -55,8 +55,37 @@ fn map_remote_actor( } } +// ── ActivityRepository ──────────────────────────────────────────────────────── + #[async_trait] -impl FederationRepository for PostgresFederationRepository { +impl ActivityRepository for PostgresFederationRepository { + async fn is_activity_processed(&self, activity_id: &str) -> Result { + let n: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM federation_processed_activities WHERE activity_id=$1", + ) + .bind(activity_id) + .fetch_one(&self.pool) + .await + .map_err(|e| anyhow!(e))?; + Ok(n > 0) + } + + async fn mark_activity_processed(&self, activity_id: &str) -> Result<()> { + sqlx::query( + "INSERT INTO federation_processed_activities(activity_id) VALUES($1) ON CONFLICT DO NOTHING", + ) + .bind(activity_id) + .execute(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|_| ()) + } +} + +// ── FollowRepository ────────────────────────────────────────────────────────── + +#[async_trait] +impl FollowRepository for PostgresFederationRepository { async fn add_follower( &self, local_user_id: uuid::Uuid, @@ -68,10 +97,16 @@ impl FederationRepository for PostgresFederationRepository { "INSERT INTO federation_followers(local_user_id,remote_actor_url,status,follow_activity_id) VALUES($1,$2,$3,$4) ON CONFLICT(local_user_id,remote_actor_url) DO UPDATE - SET status=EXCLUDED.status, follow_activity_id=EXCLUDED.follow_activity_id" + SET status=EXCLUDED.status, follow_activity_id=EXCLUDED.follow_activity_id", ) - .bind(local_user_id).bind(remote_actor_url).bind(status_str(&status)).bind(follow_activity_id) - .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + .bind(local_user_id) + .bind(remote_actor_url) + .bind(status_str(&status)) + .bind(follow_activity_id) + .execute(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|_| ()) } async fn get_follower_follow_activity_id( @@ -80,8 +115,13 @@ impl FederationRepository for PostgresFederationRepository { remote_actor_url: &str, ) -> Result> { sqlx::query_scalar::<_, String>( - "SELECT follow_activity_id FROM federation_followers WHERE local_user_id=$1 AND remote_actor_url=$2" - ).bind(local_user_id).bind(remote_actor_url).fetch_optional(&self.pool).await.map_err(|e| anyhow!(e)) + "SELECT follow_activity_id FROM federation_followers WHERE local_user_id=$1 AND remote_actor_url=$2", + ) + .bind(local_user_id) + .bind(remote_actor_url) + .fetch_optional(&self.pool) + .await + .map_err(|e| anyhow!(e)) } async fn remove_follower( @@ -117,11 +157,28 @@ impl FederationRepository for PostgresFederationRepository { COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url FROM federation_followers f LEFT JOIN remote_actors r ON r.url=f.remote_actor_url - WHERE f.local_user_id=$1 AND f.status='accepted'" - ).bind(local_user_id).fetch_all(&self.pool).await.map_err(|e| anyhow!(e)).map(|rows| rows.into_iter().map(|r| Follower { - actor: map_remote_actor(r.remote_actor_url, r.handle, r.inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url), - status: str_status(&r.status), - }).collect()) + WHERE f.local_user_id=$1 AND f.status='accepted'", + ) + .bind(local_user_id) + .fetch_all(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|rows| { + rows.into_iter() + .map(|r| Follower { + actor: map_remote_actor( + r.remote_actor_url, + r.handle, + r.inbox_url, + r.shared_inbox_url, + r.display_name, + r.avatar_url, + r.outbox_url, + ), + status: str_status(&r.status), + }) + .collect() + }) } async fn get_followers_page( @@ -147,20 +204,126 @@ impl FederationRepository for PostgresFederationRepository { FROM federation_followers f LEFT JOIN remote_actors r ON r.url=f.remote_actor_url WHERE f.local_user_id=$1 AND f.status='accepted' - ORDER BY f.created_at DESC LIMIT $2 OFFSET $3" - ).bind(local_user_id).bind(limit as i64).bind(offset as i64).fetch_all(&self.pool).await.map_err(|e| anyhow!(e)).map(|rows| rows.into_iter().map(|r| Follower { - actor: map_remote_actor(r.remote_actor_url, r.handle, r.inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url), - status: str_status(&r.status), - }).collect()) + ORDER BY f.created_at DESC LIMIT $2 OFFSET $3", + ) + .bind(local_user_id) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|rows| { + rows.into_iter() + .map(|r| Follower { + actor: map_remote_actor( + r.remote_actor_url, + r.handle, + r.inbox_url, + r.shared_inbox_url, + r.display_name, + r.avatar_url, + r.outbox_url, + ), + status: str_status(&r.status), + }) + .collect() + }) } async fn count_followers(&self, local_user_id: uuid::Uuid) -> Result { let n: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM federation_followers WHERE local_user_id=$1 AND status='accepted'" - ).bind(local_user_id).fetch_one(&self.pool).await.map_err(|e| anyhow!(e))?; + "SELECT COUNT(*) FROM federation_followers WHERE local_user_id=$1 AND status='accepted'", + ) + .bind(local_user_id) + .fetch_one(&self.pool) + .await + .map_err(|e| anyhow!(e))?; Ok(n as usize) } + async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> Result { + let n: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM federation_followers WHERE local_user_id=$1 AND status='accepted'", + ) + .bind(local_user_id) + .fetch_one(&self.pool) + .await + .map_err(|e| anyhow!(e))?; + Ok(n as usize) + } + + async fn get_accepted_followers_page( + &self, + local_user_id: uuid::Uuid, + offset: u32, + limit: usize, + ) -> Result> { + #[derive(sqlx::FromRow)] + struct Row { + remote_actor_url: String, + handle: String, + inbox_url: String, + shared_inbox_url: Option, + display_name: Option, + avatar_url: Option, + outbox_url: Option, + } + sqlx::query_as::<_, Row>( + "SELECT f.remote_actor_url, COALESCE(r.handle,'') AS handle, + COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url + FROM federation_followers f + LEFT JOIN remote_actors r ON r.url=f.remote_actor_url + WHERE f.local_user_id=$1 AND f.status='accepted' + ORDER BY f.created_at DESC LIMIT $2 OFFSET $3", + ) + .bind(local_user_id) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|rows| { + rows.into_iter() + .map(|r| { + map_remote_actor( + r.remote_actor_url, + r.handle, + r.inbox_url, + r.shared_inbox_url, + r.display_name, + r.avatar_url, + r.outbox_url, + ) + }) + .collect() + }) + } + + async fn get_accepted_follower_inboxes( + &self, + local_user_id: uuid::Uuid, + ) -> Result> { + let rows: Vec = sqlx::query_scalar( + "SELECT DISTINCT COALESCE(r.shared_inbox_url, r.inbox_url) + FROM federation_followers f + JOIN remote_actors r ON r.url = f.remote_actor_url + WHERE f.local_user_id = $1 + AND f.status = 'accepted' + AND f.remote_actor_url NOT IN ( + SELECT actor_url FROM federation_blocked_actors WHERE local_user_id = $1 + ) + AND SUBSTRING(f.remote_actor_url FROM 'https?://([^/]+)') + NOT IN (SELECT domain FROM federation_blocked_domains) + AND COALESCE(r.shared_inbox_url, r.inbox_url) IS NOT NULL + AND COALESCE(r.shared_inbox_url, r.inbox_url) <> ''", + ) + .bind(local_user_id) + .fetch_all(&self.pool) + .await + .map_err(|e| anyhow!(e))?; + Ok(rows) + } + async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result> { #[derive(sqlx::FromRow)] struct Row { @@ -177,10 +340,27 @@ impl FederationRepository for PostgresFederationRepository { COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url FROM federation_followers f LEFT JOIN remote_actors r ON r.url=f.remote_actor_url - WHERE f.local_user_id=$1 AND f.status='pending'" - ).bind(local_user_id).fetch_all(&self.pool).await.map_err(|e| anyhow!(e)).map(|rows| rows.into_iter().map(|r| - map_remote_actor(r.remote_actor_url, r.handle, r.inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url) - ).collect()) + WHERE f.local_user_id=$1 AND f.status='pending'", + ) + .bind(local_user_id) + .fetch_all(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|rows| { + rows.into_iter() + .map(|r| { + map_remote_actor( + r.remote_actor_url, + r.handle, + r.inbox_url, + r.shared_inbox_url, + r.display_name, + r.avatar_url, + r.outbox_url, + ) + }) + .collect() + }) } async fn update_follower_status( @@ -189,9 +369,16 @@ impl FederationRepository for PostgresFederationRepository { remote_actor_url: &str, status: FollowerStatus, ) -> Result<()> { - sqlx::query("UPDATE federation_followers SET status=$3 WHERE local_user_id=$1 AND remote_actor_url=$2") - .bind(local_user_id).bind(remote_actor_url).bind(status_str(&status)) - .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + sqlx::query( + "UPDATE federation_followers SET status=$3 WHERE local_user_id=$1 AND remote_actor_url=$2", + ) + .bind(local_user_id) + .bind(remote_actor_url) + .bind(status_str(&status)) + .execute(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|_| ()) } async fn add_following( @@ -205,10 +392,16 @@ impl FederationRepository for PostgresFederationRepository { "INSERT INTO federation_following(local_user_id,remote_actor_url,follow_activity_id,outbox_url) VALUES($1,$2,$3,$4) ON CONFLICT(local_user_id,remote_actor_url) DO UPDATE - SET follow_activity_id=EXCLUDED.follow_activity_id" + SET follow_activity_id=EXCLUDED.follow_activity_id", ) - .bind(local_user_id).bind(&actor.url).bind(follow_activity_id).bind(&actor.outbox_url) - .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + .bind(local_user_id) + .bind(&actor.url) + .bind(follow_activity_id) + .bind(&actor.outbox_url) + .execute(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|_| ()) } async fn get_follow_activity_id( @@ -217,8 +410,13 @@ impl FederationRepository for PostgresFederationRepository { remote_actor_url: &str, ) -> Result> { sqlx::query_scalar::<_, String>( - "SELECT follow_activity_id FROM federation_following WHERE local_user_id=$1 AND remote_actor_url=$2" - ).bind(local_user_id).bind(remote_actor_url).fetch_optional(&self.pool).await.map_err(|e| anyhow!(e)) + "SELECT follow_activity_id FROM federation_following WHERE local_user_id=$1 AND remote_actor_url=$2", + ) + .bind(local_user_id) + .bind(remote_actor_url) + .fetch_optional(&self.pool) + .await + .map_err(|e| anyhow!(e)) } async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { @@ -249,10 +447,27 @@ impl FederationRepository for PostgresFederationRepository { COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url FROM federation_following f LEFT JOIN remote_actors r ON r.url=f.remote_actor_url - WHERE f.local_user_id=$1" - ).bind(local_user_id).fetch_all(&self.pool).await.map_err(|e| anyhow!(e)).map(|rows| rows.into_iter().map(|r| - map_remote_actor(r.remote_actor_url, r.handle, r.inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url) - ).collect()) + WHERE f.local_user_id=$1", + ) + .bind(local_user_id) + .fetch_all(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|rows| { + rows.into_iter() + .map(|r| { + map_remote_actor( + r.remote_actor_url, + r.handle, + r.inbox_url, + r.shared_inbox_url, + r.display_name, + r.avatar_url, + r.outbox_url, + ) + }) + .collect() + }) } async fn get_following_page( @@ -276,20 +491,40 @@ impl FederationRepository for PostgresFederationRepository { COALESCE(r.inbox_url,'') AS inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url FROM federation_following f LEFT JOIN remote_actors r ON r.url=f.remote_actor_url - WHERE f.local_user_id=$1 AND f.status='accepted' - ORDER BY f.created_at DESC LIMIT $2 OFFSET $3" - ).bind(local_user_id).bind(limit as i64).bind(offset as i64).fetch_all(&self.pool).await.map_err(|e| anyhow!(e)).map(|rows| rows.into_iter().map(|r| - map_remote_actor(r.remote_actor_url, r.handle, r.inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url) - ).collect()) + WHERE f.local_user_id=$1 + ORDER BY f.created_at DESC LIMIT $2 OFFSET $3", + ) + .bind(local_user_id) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|rows| { + rows.into_iter() + .map(|r| { + map_remote_actor( + r.remote_actor_url, + r.handle, + r.inbox_url, + r.shared_inbox_url, + r.display_name, + r.avatar_url, + r.outbox_url, + ) + }) + .collect() + }) } async fn count_following(&self, local_user_id: uuid::Uuid) -> Result { - let n: i64 = - sqlx::query_scalar("SELECT COUNT(*) FROM federation_following WHERE local_user_id=$1 AND status='accepted'") - .bind(local_user_id) - .fetch_one(&self.pool) - .await - .map_err(|e| anyhow!(e))?; + let n: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM federation_following WHERE local_user_id=$1", + ) + .bind(local_user_id) + .fetch_one(&self.pool) + .await + .map_err(|e| anyhow!(e))?; Ok(n as usize) } @@ -308,42 +543,51 @@ impl FederationRepository for PostgresFederationRepository { remote_actor_url: &str, ) -> Result> { sqlx::query_scalar::<_, String>( - "SELECT outbox_url FROM federation_following WHERE local_user_id=$1 AND remote_actor_url=$2" - ).bind(local_user_id).bind(remote_actor_url).fetch_optional(&self.pool).await.map_err(|e| anyhow!(e)) - } - - async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> { - sqlx::query( - "INSERT INTO remote_actors(url,handle,display_name,inbox_url,shared_inbox_url,public_key,avatar_url,outbox_url,last_fetched_at) - VALUES($1,$2,$3,$4,$5,'',$6,$7,NOW()) - ON CONFLICT(url) DO UPDATE SET handle=EXCLUDED.handle,display_name=EXCLUDED.display_name, - inbox_url=EXCLUDED.inbox_url,shared_inbox_url=EXCLUDED.shared_inbox_url, - avatar_url=EXCLUDED.avatar_url,outbox_url=EXCLUDED.outbox_url,last_fetched_at=NOW()" + "SELECT outbox_url FROM federation_following WHERE local_user_id=$1 AND remote_actor_url=$2", ) - .bind(&actor.url).bind(&actor.handle).bind(&actor.display_name) - .bind(&actor.inbox_url).bind(&actor.shared_inbox_url) - .bind(&actor.avatar_url).bind(&actor.outbox_url) - .execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + .bind(local_user_id) + .bind(remote_actor_url) + .fetch_optional(&self.pool) + .await + .map_err(|e| anyhow!(e)) } - async fn get_remote_actor(&self, actor_url: &str) -> Result> { - #[derive(sqlx::FromRow)] - struct Row { - url: String, - handle: String, - inbox_url: String, - shared_inbox_url: Option, - display_name: Option, - avatar_url: Option, - outbox_url: Option, - } - sqlx::query_as::<_, Row>( - "SELECT url,handle,inbox_url,shared_inbox_url,display_name,avatar_url,outbox_url FROM remote_actors WHERE url=$1" - ).bind(actor_url).fetch_optional(&self.pool).await.map_err(|e| anyhow!(e)).map(|o| o.map(|r| - map_remote_actor(r.url, r.handle, r.inbox_url, r.shared_inbox_url, r.display_name, r.avatar_url, r.outbox_url) - )) - } + async fn migrate_follower_actor( + &self, + old_actor_url: &str, + new_actor_url: &str, + ) -> Result> { + let mut tx = self.pool.begin().await.map_err(|e| anyhow!(e))?; + let affected: Vec = sqlx::query_scalar( + "INSERT INTO federation_following(local_user_id, remote_actor_url, follow_activity_id, outbox_url) + SELECT local_user_id, $2, follow_activity_id, outbox_url + FROM federation_following + WHERE remote_actor_url = $1 + ON CONFLICT (local_user_id, remote_actor_url) DO NOTHING + RETURNING local_user_id", + ) + .bind(old_actor_url) + .bind(new_actor_url) + .fetch_all(&mut *tx) + .await + .map_err(|e| anyhow!(e))?; + + sqlx::query("DELETE FROM federation_following WHERE remote_actor_url = $1") + .bind(old_actor_url) + .execute(&mut *tx) + .await + .map_err(|e| anyhow!(e))?; + + tx.commit().await.map_err(|e| anyhow!(e))?; + Ok(affected) + } +} + +// ── ActorRepository ─────────────────────────────────────────────────────────── + +#[async_trait] +impl ActorRepository for PostgresFederationRepository { async fn get_local_actor_keypair( &self, user_id: uuid::Uuid, @@ -372,14 +616,70 @@ impl FederationRepository for PostgresFederationRepository { public_key: String, private_key: String, ) -> Result<()> { - sqlx::query("UPDATE users SET public_key=$2, private_key=$3, updated_at=NOW() WHERE id=$1") - .bind(user_id) - .bind(&public_key) - .bind(&private_key) - .execute(&self.pool) - .await - .map_err(|e| anyhow!(e)) - .map(|_| ()) + sqlx::query( + "UPDATE users SET public_key=$2, private_key=$3, updated_at=NOW() WHERE id=$1", + ) + .bind(user_id) + .bind(&public_key) + .bind(&private_key) + .execute(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|_| ()) + } + + async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> { + sqlx::query( + "INSERT INTO remote_actors(url,handle,display_name,inbox_url,shared_inbox_url,public_key,avatar_url,outbox_url,last_fetched_at) + VALUES($1,$2,$3,$4,$5,'',$6,$7,NOW()) + ON CONFLICT(url) DO UPDATE SET handle=EXCLUDED.handle,display_name=EXCLUDED.display_name, + inbox_url=EXCLUDED.inbox_url,shared_inbox_url=EXCLUDED.shared_inbox_url, + avatar_url=EXCLUDED.avatar_url,outbox_url=EXCLUDED.outbox_url,last_fetched_at=NOW()", + ) + .bind(&actor.url) + .bind(&actor.handle) + .bind(&actor.display_name) + .bind(&actor.inbox_url) + .bind(&actor.shared_inbox_url) + .bind(&actor.avatar_url) + .bind(&actor.outbox_url) + .execute(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|_| ()) + } + + async fn get_remote_actor(&self, actor_url: &str) -> Result> { + #[derive(sqlx::FromRow)] + struct Row { + url: String, + handle: String, + inbox_url: String, + shared_inbox_url: Option, + display_name: Option, + avatar_url: Option, + outbox_url: Option, + } + sqlx::query_as::<_, Row>( + "SELECT url,handle,inbox_url,shared_inbox_url,display_name,avatar_url,outbox_url FROM remote_actors WHERE url=$1", + ) + .bind(actor_url) + .fetch_optional(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|o| { + o.map(|r| { + map_remote_actor( + r.url, + r.handle, + r.inbox_url, + r.shared_inbox_url, + r.display_name, + r.avatar_url, + r.outbox_url, + ) + }) + }) } async fn add_announce( @@ -403,20 +703,44 @@ impl FederationRepository for PostgresFederationRepository { .map(|_| ()) } - async fn count_announces(&self, object_url: &str) -> Result { - let n: i64 = - sqlx::query_scalar("SELECT COUNT(*) FROM federation_announces WHERE object_url=$1") - .bind(object_url) - .fetch_one(&self.pool) - .await - .map_err(|e| anyhow!(e))?; - Ok(n as usize) + async fn remove_announce(&self, activity_id: &str, actor_url: &str) -> Result<()> { + sqlx::query( + "DELETE FROM federation_announces WHERE activity_id=$1 AND actor_url=$2", + ) + .bind(activity_id) + .bind(actor_url) + .execute(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|_| ()) } + async fn count_announces(&self, object_url: &str) -> Result { + let n: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM federation_announces WHERE object_url=$1", + ) + .bind(object_url) + .fetch_one(&self.pool) + .await + .map_err(|e| anyhow!(e))?; + Ok(n as usize) + } +} + +// ── BlocklistRepository ─────────────────────────────────────────────────────── + +#[async_trait] +impl BlocklistRepository for PostgresFederationRepository { async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> Result<()> { sqlx::query( - "INSERT INTO federation_blocked_domains(domain,reason) VALUES($1,$2) ON CONFLICT(domain) DO NOTHING" - ).bind(domain).bind(reason).execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + "INSERT INTO federation_blocked_domains(domain,reason) VALUES($1,$2) ON CONFLICT(domain) DO NOTHING", + ) + .bind(domain) + .bind(reason) + .execute(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|_| ()) } async fn remove_blocked_domain(&self, domain: &str) -> Result<()> { @@ -453,77 +777,64 @@ impl FederationRepository for PostgresFederationRepository { } async fn is_domain_blocked(&self, domain: &str) -> Result { - let n: i64 = - sqlx::query_scalar("SELECT COUNT(*) FROM federation_blocked_domains WHERE domain=$1") - .bind(domain) - .fetch_one(&self.pool) - .await - .map_err(|e| anyhow!(e))?; + let n: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM federation_blocked_domains WHERE domain=$1", + ) + .bind(domain) + .fetch_one(&self.pool) + .await + .map_err(|e| anyhow!(e))?; Ok(n > 0) } async fn add_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { sqlx::query( - "INSERT INTO federation_blocked_actors(local_user_id,actor_url) VALUES($1,$2) ON CONFLICT DO NOTHING" - ).bind(local_user_id).bind(actor_url).execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ()) + "INSERT INTO federation_blocked_actors(local_user_id,actor_url) VALUES($1,$2) ON CONFLICT DO NOTHING", + ) + .bind(local_user_id) + .bind(actor_url) + .execute(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|_| ()) } - async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { - sqlx::query("DELETE FROM federation_blocked_actors WHERE local_user_id=$1 AND actor_url=$2") - .bind(local_user_id) - .bind(actor_url) - .execute(&self.pool) - .await - .map_err(|e| anyhow!(e)) - .map(|_| ()) + async fn remove_blocked_actor( + &self, + local_user_id: uuid::Uuid, + actor_url: &str, + ) -> Result<()> { + sqlx::query( + "DELETE FROM federation_blocked_actors WHERE local_user_id=$1 AND actor_url=$2", + ) + .bind(local_user_id) + .bind(actor_url) + .execute(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|_| ()) } async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result> { sqlx::query_scalar::<_, String>( - "SELECT actor_url FROM federation_blocked_actors WHERE local_user_id=$1 ORDER BY created_at DESC" - ).bind(local_user_id).fetch_all(&self.pool).await.map_err(|e| anyhow!(e)) + "SELECT actor_url FROM federation_blocked_actors WHERE local_user_id=$1 ORDER BY created_at DESC", + ) + .bind(local_user_id) + .fetch_all(&self.pool) + .await + .map_err(|e| anyhow!(e)) } async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result { let n: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM federation_blocked_actors WHERE local_user_id=$1 AND actor_url=$2" - ).bind(local_user_id).bind(actor_url).fetch_one(&self.pool).await.map_err(|e| anyhow!(e))?; - Ok(n > 0) - } - - async fn migrate_follower_actor( - &self, - old_actor_url: &str, - new_actor_url: &str, - ) -> Result> { - let mut tx = self.pool.begin().await.map_err(|e| anyhow!(e))?; - - // Copy rows to the new actor URL, carrying over existing data. - // ON CONFLICT DO NOTHING skips users already following the new actor. - // RETURNING gives us user IDs that actually need a re-follow. - let affected: Vec = sqlx::query_scalar( - "INSERT INTO federation_following(local_user_id, remote_actor_url, follow_activity_id, outbox_url) - SELECT local_user_id, $2, follow_activity_id, outbox_url - FROM federation_following - WHERE remote_actor_url = $1 - ON CONFLICT (local_user_id, remote_actor_url) DO NOTHING - RETURNING local_user_id", + "SELECT COUNT(*) FROM federation_blocked_actors WHERE local_user_id=$1 AND actor_url=$2", ) - .bind(old_actor_url) - .bind(new_actor_url) - .fetch_all(&mut *tx) + .bind(local_user_id) + .bind(actor_url) + .fetch_one(&self.pool) .await .map_err(|e| anyhow!(e))?; - - // Delete the old rows. - sqlx::query("DELETE FROM federation_following WHERE remote_actor_url = $1") - .bind(old_actor_url) - .execute(&mut *tx) - .await - .map_err(|e| anyhow!(e))?; - - tx.commit().await.map_err(|e| anyhow!(e))?; - Ok(affected) + Ok(n > 0) } } @@ -543,23 +854,30 @@ impl PostgresApUserRepository { &self, id: uuid::Uuid, username: String, + display_name: Option, bio: Option, avatar_url: Option, header_url: Option, also_known_as: Option, ) -> ApUser { - let profile_url = url::Url::parse(&format!("{}/users/{}", self.base_url, username)).ok(); + let profile_url = + url::Url::parse(&format!("{}/users/{}", self.base_url, username)).ok(); let avatar_url = avatar_url.and_then(|u| url::Url::parse(&u).ok()); let banner_url = header_url.and_then(|u| url::Url::parse(&u).ok()); ApUser { id, username, + display_name, bio, avatar_url, banner_url, - also_known_as, + also_known_as: also_known_as.into_iter().collect(), profile_url, attachment: vec![], + manually_approves_followers: false, + discoverable: true, + actor_type: ApActorType::default(), + featured_url: None, } } } @@ -571,13 +889,14 @@ impl ApUserRepository for PostgresApUserRepository { struct Row { id: uuid::Uuid, username: String, + display_name: Option, bio: Option, avatar_url: Option, header_url: Option, also_known_as: Option, } let row = sqlx::query_as::<_, Row>( - "SELECT id,username,bio,avatar_url,header_url,also_known_as FROM users WHERE id=$1 AND local=true", + "SELECT id,username,display_name,bio,avatar_url,header_url,also_known_as FROM users WHERE id=$1 AND local=true", ) .bind(id) .fetch_optional(&self.pool) @@ -587,6 +906,7 @@ impl ApUserRepository for PostgresApUserRepository { self.row_to_ap_user( r.id, r.username, + r.display_name, r.bio, r.avatar_url, r.header_url, @@ -600,13 +920,14 @@ impl ApUserRepository for PostgresApUserRepository { struct Row { id: uuid::Uuid, username: String, + display_name: Option, bio: Option, avatar_url: Option, header_url: Option, also_known_as: Option, } let row = sqlx::query_as::<_, Row>( - "SELECT id,username,bio,avatar_url,header_url,also_known_as FROM users WHERE username=$1 AND local=true", + "SELECT id,username,display_name,bio,avatar_url,header_url,also_known_as FROM users WHERE username=$1 AND local=true", ) .bind(username) .fetch_optional(&self.pool) @@ -616,6 +937,7 @@ impl ApUserRepository for PostgresApUserRepository { self.row_to_ap_user( r.id, r.username, + r.display_name, r.bio, r.avatar_url, r.header_url, diff --git a/crates/adapters/postgres/migrations/014_federation_processed_activities.sql b/crates/adapters/postgres/migrations/014_federation_processed_activities.sql new file mode 100644 index 0000000..d091202 --- /dev/null +++ b/crates/adapters/postgres/migrations/014_federation_processed_activities.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS federation_processed_activities ( + activity_id TEXT PRIMARY KEY, + processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_fed_processed_activities_at + ON federation_processed_activities(processed_at); diff --git a/crates/bootstrap/Cargo.toml b/crates/bootstrap/Cargo.toml index 1cee4b3..ed7f287 100644 --- a/crates/bootstrap/Cargo.toml +++ b/crates/bootstrap/Cargo.toml @@ -8,26 +8,29 @@ name = "thoughts" path = "src/main.rs" [dependencies] -presentation = { workspace = true } -domain = { workspace = true } -postgres = { workspace = true } -postgres-search = { workspace = true } +presentation = { workspace = true } +domain = { workspace = true } +postgres = { workspace = true } +postgres-search = { workspace = true } postgres-federation = { workspace = true } -activitypub = { workspace = true } -k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" } -nats = { workspace = true } -event-transport = { workspace = true } -auth = { workspace = true } -storage = { workspace = true } -application = { workspace = true } -sqlx = { workspace = true } -async-nats = { workspace = true } -async-trait = { workspace = true } -tokio = { workspace = true, features = ["full"] } -axum = { workspace = true } -tower-http = { workspace = true } -tracing = { workspace = true } -tracing-subscriber = { workspace = true } -dotenvy = { workspace = true } -tower_governor = "0.8" -http = "1" +activitypub = { workspace = true } +k-ap = { version = "0.3.0", registry = "gitea" } +serde_json = { workspace = true } +anyhow = { workspace = true } +nats = { workspace = true } +event-transport = { workspace = true } +event-payload = { workspace = true } +auth = { workspace = true } +storage = { workspace = true } +application = { workspace = true } +sqlx = { workspace = true } +async-nats = { workspace = true } +async-trait = { workspace = true } +tokio = { workspace = true, features = ["full"] } +axum = { workspace = true } +tower-http = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +dotenvy = { workspace = true } +tower_governor = "0.8" +http = "1" diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index e2a6e51..27cf861 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -15,8 +15,8 @@ use domain::{ events::DomainEvent, ports::{EventPublisher, OutboxWriter}, }; -use event_transport::EventPublisherAdapter; -use k_ap::ActivityPubService; +use event_transport::{EventPublisherAdapter, Transport}; +use k_ap::{ActivityPubService, FederationEvent}; use nats::NatsTransport; use postgres::activitypub::PgActivityPubRepository; use postgres::engagement::PgEngagementRepository; @@ -42,6 +42,39 @@ impl EventPublisher for NoOpEventPublisher { } } +struct KapPublisher(NatsTransport); + +#[async_trait] +impl k_ap::data::EventPublisher for KapPublisher { + async fn publish(&self, event: FederationEvent) -> anyhow::Result<()> { + let (subject, payload) = match event { + FederationEvent::DeliveryRequested { inbox, activity, signing_actor_id } => ( + "federation.delivery.requested", + serde_json::to_vec(&event_payload::EventPayload::FederationDeliveryRequested { + inbox: inbox.to_string(), + activity, + signing_actor_id: signing_actor_id.to_string(), + })?, + ), + FederationEvent::BackfillRequested { owner_user_id, follower_inbox_url } => ( + "federation.backfill.requested", + serde_json::to_vec(&event_payload::EventPayload::FederationBackfillRequested { + owner_user_id: owner_user_id.to_string(), + follower_inbox_url, + })?, + ), + FederationEvent::DeliveryFailed { inbox, error, .. } => { + tracing::warn!(%inbox, %error, "AP delivery failed permanently"); + return Ok(()); + } + }; + self.0 + .publish_bytes(subject, &payload) + .await + .map_err(|e| anyhow::anyhow!(e)) + } +} + pub async fn build(cfg: &Config) -> Infrastructure { // 1. Database connection + migrations let pool = PgPool::connect(&cfg.database_url) @@ -54,49 +87,64 @@ pub async fn build(cfg: &Config) -> Infrastructure { tracing::info!("Database connected and migrations applied"); // 2. Event publisher — real NATS or no-op fallback - let event_publisher: Arc = match &cfg.nats_url { + let nats_client: Option = match &cfg.nats_url { Some(url) => match async_nats::connect(url).await { Ok(client) => { tracing::info!("Connected to NATS at {url}"); if let Err(e) = nats::ensure_stream(&client).await { tracing::warn!("JetStream stream setup failed: {e} — events may be lost"); } - Arc::new(EventPublisherAdapter::new(NatsTransport::new(client))) + Some(client) } Err(e) => { tracing::warn!("NATS connect failed ({e}) — falling back to no-op publisher"); - Arc::new(NoOpEventPublisher) + None } }, None => { tracing::info!("NATS_URL not set — using no-op event publisher"); - Arc::new(NoOpEventPublisher) + None } }; + let event_publisher: Arc = match &nats_client { + Some(client) => Arc::new(EventPublisherAdapter::new(NatsTransport::new(client.clone()))), + None => Arc::new(NoOpEventPublisher), + }; + let kap_publisher: Option> = nats_client + .as_ref() + .map(|c| Arc::new(KapPublisher(NatsTransport::new(c.clone()))) as _); // 3. ActivityPub federation let connections_repo = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())); - let raw_ap_service = Arc::new( - ActivityPubService::builder( - Arc::new(PostgresFederationRepository::new(pool.clone())), - Arc::new(PostgresApUserRepository::new( - pool.clone(), - cfg.base_url.clone(), - )), - Arc::new(ThoughtsObjectHandler::new( - Arc::new(PgActivityPubRepository::new(pool.clone())), - &cfg.base_url, - Some(event_publisher.clone()), - Arc::new(postgres::tag::PgTagRepository::new(pool.clone())), - )), + let fed_repo = Arc::new(PostgresFederationRepository::new(pool.clone())); + let ap_handler = Arc::new(ThoughtsObjectHandler::new( + Arc::new(PgActivityPubRepository::new(pool.clone())), + &cfg.base_url, + Some(event_publisher.clone()), + Arc::new(postgres::tag::PgTagRepository::new(pool.clone())), + )); + let mut ap_builder = ActivityPubService::builder(cfg.base_url.clone()) + .activity_repo(fed_repo.clone()) + .follow_repo(fed_repo.clone()) + .actor_repo(fed_repo.clone()) + .blocklist_repo(fed_repo.clone()) + .user_repo(Arc::new(PostgresApUserRepository::new( + pool.clone(), cfg.base_url.clone(), - ) + ))) + .content_reader(ap_handler.clone()) + .object_handler(ap_handler) .allow_registration(cfg.allow_registration) .software_name("thoughts") - .debug(cfg.debug) - .build() - .await - .expect("Failed to build ActivityPubService"), + .debug(cfg.debug); + if let Some(publisher) = kap_publisher { + ap_builder = ap_builder.event_publisher(publisher); + } + let raw_ap_service = Arc::new( + ap_builder + .build() + .await + .expect("Failed to build ActivityPubService"), ); let ap_service = Arc::new(ApFederationAdapter::new(raw_ap_service, connections_repo)); diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 2993da0..825d5a6 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -8,23 +8,25 @@ name = "thoughts-worker" path = "src/main.rs" [dependencies] -domain = { workspace = true } -application = { workspace = true } -nats = { workspace = true } -event-transport = { workspace = true } -event-payload = { workspace = true } -k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" } -activitypub = { workspace = true } -postgres = { workspace = true } +domain = { workspace = true } +application = { workspace = true } +nats = { workspace = true } +event-transport = { workspace = true } +event-payload = { workspace = true } +k-ap = { version = "0.3.0", registry = "gitea" } +activitypub = { workspace = true } +postgres = { workspace = true } postgres-federation = { workspace = true } -async-nats = { workspace = true } -tokio = { workspace = true, features = ["full"] } -futures = { workspace = true } -tracing = { workspace = true } -tracing-subscriber = { workspace = true } -dotenvy = { workspace = true } -serde_json = { workspace = true } -sqlx = { workspace = true } +async-nats = { workspace = true } +tokio = { workspace = true, features = ["full"] } +futures = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +dotenvy = { workspace = true } +serde_json = { workspace = true } +sqlx = { workspace = true } +url = { workspace = true } +uuid = { workspace = true } [dev-dependencies] domain = { workspace = true, features = ["test-helpers"] } diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs index d7437d0..e61e195 100644 --- a/crates/worker/src/factory.rs +++ b/crates/worker/src/factory.rs @@ -23,10 +23,11 @@ pub struct WorkerHandlers { pub struct WorkerInfra { pub pool: PgPool, - pub consumer: event_transport::EventConsumerAdapter, + pub message_source: nats::NatsMessageSource, pub handlers: WorkerHandlers, pub dlq_store: Arc, pub event_publisher: Arc, + pub raw_ap_service: Arc, } pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> WorkerInfra { @@ -43,28 +44,32 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker // ActivityPub service (for federation fan-out) let connections_repo_worker = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())); + let fed_repo_worker = Arc::new(PostgresFederationRepository::new(pool.clone())); + let ap_handler_worker = Arc::new(ThoughtsObjectHandler::new( + Arc::new(PgActivityPubRepository::new(pool.clone())), + base_url, + None, + Arc::new(postgres::tag::PgTagRepository::new(pool.clone())), + )); let raw_ap_service = Arc::new( - ActivityPubService::builder( - Arc::new(PostgresFederationRepository::new(pool.clone())), - Arc::new(PostgresApUserRepository::new( + ActivityPubService::builder(base_url.to_string()) + .activity_repo(fed_repo_worker.clone()) + .follow_repo(fed_repo_worker.clone()) + .actor_repo(fed_repo_worker.clone()) + .blocklist_repo(fed_repo_worker.clone()) + .user_repo(Arc::new(PostgresApUserRepository::new( pool.clone(), base_url.to_string(), - )), - Arc::new(ThoughtsObjectHandler::new( - Arc::new(PgActivityPubRepository::new(pool.clone())), - base_url, - None, - Arc::new(postgres::tag::PgTagRepository::new(pool.clone())), - )), - base_url, - ) - .software_name("thoughts") - .build() - .await - .expect("ActivityPubService build failed"), + ))) + .content_reader(ap_handler_worker.clone()) + .object_handler(ap_handler_worker) + .software_name("thoughts") + .build() + .await + .expect("ActivityPubService build failed"), ); let ap_service = Arc::new(ApFederationAdapter::new( - raw_ap_service, + raw_ap_service.clone(), connections_repo_worker, )); let ap_outbound = ap_service.clone() as Arc; @@ -110,18 +115,17 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker nats::ensure_stream(&nats_client) .await .expect("JetStream stream setup failed"); - let consumer = event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new( - nats_client.clone(), - )); + let message_source = nats::NatsMessageSource::new(nats_client.clone()); let event_publisher: Arc = Arc::new( event_transport::EventPublisherAdapter::new(nats::NatsTransport::new(nats_client)), ); WorkerInfra { pool, - consumer, + message_source, handlers, dlq_store, event_publisher, + raw_ap_service, } } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 399566b..b444ad7 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -3,9 +3,13 @@ mod factory; mod handlers; mod outbox_relay; -use domain::ports::EventConsumer; +use domain::{errors::DomainError, events::DomainEvent}; +use event_payload::EventPayload; +use event_transport::MessageSource; use futures::StreamExt; use nats::CONSUMER_MAX_DELIVER; +use url::Url; +use uuid::Uuid; #[tokio::main] async fn main() { @@ -21,13 +25,11 @@ async fn main() { tracing::info!("Building worker..."); let infra = factory::build(&database_url, &base_url, &nats_url).await; - // Spawn DLQ processor as a background task. tokio::spawn(dlq::run_dlq_processor( infra.dlq_store.clone(), infra.event_publisher.clone(), )); - // Spawn outbox relay — polls DB for undelivered events and publishes them. tokio::spawn( outbox_relay::OutboxRelay { pool: infra.pool.clone(), @@ -38,71 +40,123 @@ async fn main() { ); tracing::info!("Worker started, consuming events..."); - let mut stream = infra.consumer.consume(); + let mut stream = infra.message_source.messages(); while let Some(result) = stream.next().await { match result { - Ok(envelope) => { - let event = &envelope.event; - let event_type = event_payload::EventPayload::from(event).subject(); - tracing::info!( - event_type, - delivery = envelope.delivery_count, - "received event" - ); - - let n = infra.handlers.notification.handle(event).await; - let f = infra.handlers.federation.handle(event).await; - let fm = infra.handlers.federation_management.handle(event).await; - - if n.is_ok() && f.is_ok() && fm.is_ok() { - (envelope.ack)(); - tracing::info!(event_type, "event handled ok"); - } else { - if let Err(e) = &n { - tracing::error!("notification handler: {e}"); - } - if let Err(e) = &f { - tracing::error!("federation handler: {e}"); - } - if let Err(e) = &fm { - tracing::error!("federation management handler: {e}"); + Err(e) => tracing::error!("consumer error: {e}"), + Ok(raw) => { + let payload = match serde_json::from_slice::(&raw.payload) { + Ok(p) => p, + Err(e) => { + tracing::warn!("failed to deserialize event payload — acking: {e}"); + (raw.ack)(); + continue; } + }; - // Last delivery attempt -> move to DLQ then ack. - // Earlier attempts -> nack so NATS retries. - if envelope.delivery_count >= CONSUMER_MAX_DELIVER as u64 { - let error_msg = n - .err() - .or(f.err()) - .or(fm.err()) - .map(|e| e.to_string()) - .unwrap_or_else(|| "unknown error".into()); + let event_type = payload.subject(); + tracing::info!(event_type, delivery = raw.delivery_count, "received event"); - // Serialize event back to payload for storage. - let ep = event_payload::EventPayload::from(event); - let event_type = ep.subject().to_string(); - let payload = serde_json::to_value(&ep).unwrap_or(serde_json::Value::Null); - - if let Err(e) = infra - .dlq_store - .insert(&event_type, &payload, &error_msg) - .await - { - tracing::error!("DLQ insert failed: {e} — message lost"); - } else { - tracing::warn!( - event_type, - delivery_count = envelope.delivery_count, - "event exhausted — moved to DLQ" - ); + let outcome: Result<(), DomainError> = match payload { + // ── k-ap federation events ──────────────────────────── + EventPayload::FederationDeliveryRequested { + inbox, + activity, + signing_actor_id, + } => { + let result = async { + let inbox_url = Url::parse(&inbox) + .map_err(|e| DomainError::Internal(e.to_string()))?; + let actor_id = Uuid::parse_str(&signing_actor_id) + .map_err(|e| DomainError::Internal(e.to_string()))?; + infra + .raw_ap_service + .deliver_to_inbox(inbox_url, activity, actor_id) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + } + .await; + result + } + EventPayload::FederationBackfillRequested { + owner_user_id, + follower_inbox_url, + } => { + let result = async { + let owner_id = Uuid::parse_str(&owner_user_id) + .map_err(|e| DomainError::Internal(e.to_string()))?; + infra + .raw_ap_service + .run_backfill_for_follower(owner_id, follower_inbox_url) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + } + .await; + result + } + + // ── domain events ────────────────────────────────────── + p => match DomainEvent::try_from(p) { + Err(e) => { + tracing::warn!("unknown event type — acking: {e}"); + (raw.ack)(); + continue; + } + Ok(event) => { + let n = infra.handlers.notification.handle(&event).await; + let f = infra.handlers.federation.handle(&event).await; + let fm = infra.handlers.federation_management.handle(&event).await; + match (n, f, fm) { + (Ok(()), Ok(()), Ok(())) => Ok(()), + (n, f, fm) => { + if let Err(e) = &n { + tracing::error!("notification handler: {e}"); + } + if let Err(e) = &f { + tracing::error!("federation handler: {e}"); + } + if let Err(e) = &fm { + tracing::error!("federation management handler: {e}"); + } + Err(n.err().or(f.err()).or(fm.err()).unwrap()) + } + } + } + }, + }; + + match outcome { + Ok(()) => { + (raw.ack)(); + tracing::info!(event_type, "event handled ok"); + } + Err(e) => { + if raw.delivery_count >= CONSUMER_MAX_DELIVER as u64 { + // Rebuild payload from raw bytes for DLQ storage. + let payload_val = serde_json::from_slice::( + &raw.payload, + ) + .unwrap_or(serde_json::Value::Null); + if let Err(dlq_err) = infra + .dlq_store + .insert(event_type, &payload_val, &e.to_string()) + .await + { + tracing::error!("DLQ insert failed: {dlq_err} — message lost"); + } else { + tracing::warn!( + event_type, + delivery_count = raw.delivery_count, + "event exhausted — moved to DLQ" + ); + } + (raw.ack)(); + } else { + (raw.nack)(); } - (envelope.ack)(); // ack from NATS — DLQ owns it now - } else { - (envelope.nack)(); } } } - Err(e) => tracing::error!("consumer error: {e}"), } } }