From df717488977ba478652c513716a39b82213b6906 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 8 May 2026 21:26:50 +0200 Subject: [PATCH] activity-pub implementation --- ...fc85702d8c39c6301839de7b4d27f4a4d41b.json} | 12 +- ...194094e10122ea596e8d9323968e600635a9.json} | 10 +- ...499a56832d1c90eac2f5ba06b019c3f86541.json} | 18 +- ...3adaa4ce73fc037e2bf2a87d60187aeb7361.json} | 12 +- ...45b386fc7dc6cf3e72c924e2f5da49dfb469.json} | 12 +- ...467a36d3920ff4c020f5461b2cdb04361888.json} | 12 +- ...8225e6e18e6079644b949258a39bf4b0fe3e5.json | 12 - ...b8fec11fe868b940faee5a351795caaa2357.json} | 12 +- ...54745b3331b84ce9a2178f7812f1ed7262cc.json} | 12 +- ...f28e1d55234a242a7ddba50db13cf73b488d.json} | 12 +- ...c31fddcf182850782e9bfbc4ff3ee8b7d4bb.json} | 12 +- ...897074c8ff4cdd1d3e9a13ef4b9dd5346d12a.json | 12 + Cargo.lock | 452 +++++++++++++++++- Cargo.toml | 7 +- crates/adapters/activitypub/Cargo.toml | 24 + crates/adapters/activitypub/src/activities.rs | 314 ++++++++++++ .../adapters/activitypub/src/actor_handler.rs | 24 + crates/adapters/activitypub/src/actors.rs | 258 ++++++++++ crates/adapters/activitypub/src/data.rs | 35 ++ crates/adapters/activitypub/src/error.rs | 36 ++ .../adapters/activitypub/src/event_handler.rs | 127 +++++ crates/adapters/activitypub/src/federation.rs | 26 + .../activitypub/src/followers_handler.rs | 62 +++ crates/adapters/activitypub/src/inbox.rs | 20 + crates/adapters/activitypub/src/lib.rs | 21 + crates/adapters/activitypub/src/objects.rs | 137 ++++++ crates/adapters/activitypub/src/outbox.rs | 46 ++ crates/adapters/activitypub/src/repository.rs | 43 ++ crates/adapters/activitypub/src/service.rs | 188 ++++++++ crates/adapters/activitypub/src/webfinger.rs | 48 ++ crates/adapters/sqlite/Cargo.toml | 2 + .../sqlite/migrations/0003_activitypub.sql | 28 ++ crates/adapters/sqlite/src/federation.rs | 332 +++++++++++++ crates/adapters/sqlite/src/lib.rs | 35 +- crates/adapters/sqlite/src/models.rs | 13 +- crates/adapters/template-askama/src/lib.rs | 42 +- .../templates/activity_feed.html | 5 + .../template-askama/templates/following.html | 26 + .../template-askama/templates/profile.html | 14 + crates/application/src/ports.rs | 17 + crates/domain/src/models/mod.rs | 19 + crates/presentation/Cargo.toml | 2 + crates/presentation/src/dtos.rs | 11 + crates/presentation/src/extractors.rs | 44 +- crates/presentation/src/handlers.rs | 104 +++- crates/presentation/src/main.rs | 21 +- crates/presentation/src/routes.rs | 29 ++ crates/presentation/src/state.rs | 2 + crates/presentation/tests/api_test.rs | 27 ++ static/style.css | 32 ++ 50 files changed, 2724 insertions(+), 97 deletions(-) rename .sqlx/{query-47f7cf95ce3450635b643ab710cadba96f40319140834d510bc5207b2552e055.json => query-05d958c1fa38095ae2b5b81ede48fc85702d8c39c6301839de7b4d27f4a4d41b.json} (79%) rename .sqlx/{query-217854179b4f77897178e6cfae51fb743e5be49ffc59826509be37a7cc81b6ee.json => query-106b5b65162314c47217c26b7e89194094e10122ea596e8d9323968e600635a9.json} (81%) rename .sqlx/{query-8d144859b397a842118c2dc4ab30e74015a814ed8185b6f86fbe39e641ab804e.json => query-1d62f367cd14f6fa82aff8aa289e499a56832d1c90eac2f5ba06b019c3f86541.json} (73%) rename .sqlx/{query-026e2afeb573707cb360fcdab8f6137aabfaf603b5ed57b98ac2888b4a0389ff.json => query-25fd01355c929a83daf2c802b8ae3adaa4ce73fc037e2bf2a87d60187aeb7361.json} (80%) rename .sqlx/{query-a3f4385bac7f78a9959648fb325d37096c87859ded1762137ce745955f46830c.json => query-4ad3b7d450e46e8d4982a0517c7345b386fc7dc6cf3e72c924e2f5da49dfb469.json} (79%) rename .sqlx/{query-5a861b5a934c9831ff17d896fa48feb95e6dab051c5ac55a66f9793482522199.json => query-4c235060b78cc5c73e0400b39472467a36d3920ff4c020f5461b2cdb04361888.json} (79%) delete mode 100644 .sqlx/query-630e092fcd33bc312befef352a98225e6e18e6079644b949258a39bf4b0fe3e5.json rename .sqlx/{query-70ee6050284475b5641af712e5923ba2091b8b70b1885ca6518dfa4bb01fdac2.json => query-7ff439f22f880f999a72aad1359eb8fec11fe868b940faee5a351795caaa2357.json} (75%) rename .sqlx/{query-01a08873b7fa815ad98a56a0902b60414cfcdc2c7a8570351320c4bc425347c6.json => query-8a70f21c39d203867c06dc0bf74a54745b3331b84ce9a2178f7812f1ed7262cc.json} (79%) rename .sqlx/{query-affe1eb261283c09d4b1ce6e684681755f079a044ffec8ff2bd79cfd8efe16b8.json => query-a7c424c26663e4e51b1c563fa977f28e1d55234a242a7ddba50db13cf73b488d.json} (80%) rename .sqlx/{query-af883f8b78f185077e2d3dcfaa0a6e62fbdfbf00c97c9b33b699dc631476181d.json => query-ae983138ad90fda3794b784fbf62c31fddcf182850782e9bfbc4ff3ee8b7d4bb.json} (76%) create mode 100644 .sqlx/query-cca022ac6275f2b1aaf63a14420897074c8ff4cdd1d3e9a13ef4b9dd5346d12a.json create mode 100644 crates/adapters/activitypub/Cargo.toml create mode 100644 crates/adapters/activitypub/src/activities.rs create mode 100644 crates/adapters/activitypub/src/actor_handler.rs create mode 100644 crates/adapters/activitypub/src/actors.rs create mode 100644 crates/adapters/activitypub/src/data.rs create mode 100644 crates/adapters/activitypub/src/error.rs create mode 100644 crates/adapters/activitypub/src/event_handler.rs create mode 100644 crates/adapters/activitypub/src/federation.rs create mode 100644 crates/adapters/activitypub/src/followers_handler.rs create mode 100644 crates/adapters/activitypub/src/inbox.rs create mode 100644 crates/adapters/activitypub/src/lib.rs create mode 100644 crates/adapters/activitypub/src/objects.rs create mode 100644 crates/adapters/activitypub/src/outbox.rs create mode 100644 crates/adapters/activitypub/src/repository.rs create mode 100644 crates/adapters/activitypub/src/service.rs create mode 100644 crates/adapters/activitypub/src/webfinger.rs create mode 100644 crates/adapters/sqlite/migrations/0003_activitypub.sql create mode 100644 crates/adapters/sqlite/src/federation.rs create mode 100644 crates/adapters/template-askama/templates/following.html diff --git a/.sqlx/query-47f7cf95ce3450635b643ab710cadba96f40319140834d510bc5207b2552e055.json b/.sqlx/query-05d958c1fa38095ae2b5b81ede48fc85702d8c39c6301839de7b4d27f4a4d41b.json similarity index 79% rename from .sqlx/query-47f7cf95ce3450635b643ab710cadba96f40319140834d510bc5207b2552e055.json rename to .sqlx/query-05d958c1fa38095ae2b5b81ede48fc85702d8c39c6301839de7b4d27f4a4d41b.json index 8db3f5e..35fb040 100644 --- a/.sqlx/query-47f7cf95ce3450635b643ab710cadba96f40319140834d510bc5207b2552e055.json +++ b/.sqlx/query-05d958c1fa38095ae2b5b81ede48fc85702d8c39c6301839de7b4d27f4a4d41b.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n WHERE r.movie_id = ?\n ORDER BY r.watched_at DESC\n LIMIT ? OFFSET ?", + "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n WHERE r.movie_id = ?\n ORDER BY r.watched_at DESC\n LIMIT ? OFFSET ?", "describe": { "columns": [ { @@ -67,6 +67,11 @@ "name": "created_at", "ordinal": 12, "type_info": "Text" + }, + { + "name": "remote_actor_url", + "ordinal": 13, + "type_info": "Text" } ], "parameters": { @@ -85,8 +90,9 @@ false, true, false, - false + false, + true ] }, - "hash": "47f7cf95ce3450635b643ab710cadba96f40319140834d510bc5207b2552e055" + "hash": "05d958c1fa38095ae2b5b81ede48fc85702d8c39c6301839de7b4d27f4a4d41b" } diff --git a/.sqlx/query-217854179b4f77897178e6cfae51fb743e5be49ffc59826509be37a7cc81b6ee.json b/.sqlx/query-106b5b65162314c47217c26b7e89194094e10122ea596e8d9323968e600635a9.json similarity index 81% rename from .sqlx/query-217854179b4f77897178e6cfae51fb743e5be49ffc59826509be37a7cc81b6ee.json rename to .sqlx/query-106b5b65162314c47217c26b7e89194094e10122ea596e8d9323968e600635a9.json index dda6f89..744f12a 100644 --- a/.sqlx/query-217854179b4f77897178e6cfae51fb743e5be49ffc59826509be37a7cc81b6ee.json +++ b/.sqlx/query-106b5b65162314c47217c26b7e89194094e10122ea596e8d9323968e600635a9.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at,\n u.email AS user_email\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n INNER JOIN users u ON u.id = r.user_id\n ORDER BY r.watched_at DESC\n LIMIT ? OFFSET ?", + "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n WHERE r.user_id = ?\n ORDER BY r.watched_at DESC", "describe": { "columns": [ { @@ -69,13 +69,13 @@ "type_info": "Text" }, { - "name": "user_email", + "name": "remote_actor_url", "ordinal": 13, "type_info": "Text" } ], "parameters": { - "Right": 2 + "Right": 1 }, "nullable": [ false, @@ -91,8 +91,8 @@ true, false, false, - false + true ] }, - "hash": "217854179b4f77897178e6cfae51fb743e5be49ffc59826509be37a7cc81b6ee" + "hash": "106b5b65162314c47217c26b7e89194094e10122ea596e8d9323968e600635a9" } diff --git a/.sqlx/query-8d144859b397a842118c2dc4ab30e74015a814ed8185b6f86fbe39e641ab804e.json b/.sqlx/query-1d62f367cd14f6fa82aff8aa289e499a56832d1c90eac2f5ba06b019c3f86541.json similarity index 73% rename from .sqlx/query-8d144859b397a842118c2dc4ab30e74015a814ed8185b6f86fbe39e641ab804e.json rename to .sqlx/query-1d62f367cd14f6fa82aff8aa289e499a56832d1c90eac2f5ba06b019c3f86541.json index 96480e2..aade0da 100644 --- a/.sqlx/query-8d144859b397a842118c2dc4ab30e74015a814ed8185b6f86fbe39e641ab804e.json +++ b/.sqlx/query-1d62f367cd14f6fa82aff8aa289e499a56832d1c90eac2f5ba06b019c3f86541.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n WHERE r.user_id = ?\n ORDER BY r.watched_at DESC", + "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url,\n u.email AS user_email\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n INNER JOIN users u ON u.id = r.user_id\n ORDER BY r.watched_at DESC\n LIMIT ? OFFSET ?", "describe": { "columns": [ { @@ -67,10 +67,20 @@ "name": "created_at", "ordinal": 12, "type_info": "Text" + }, + { + "name": "remote_actor_url", + "ordinal": 13, + "type_info": "Text" + }, + { + "name": "user_email", + "ordinal": 14, + "type_info": "Text" } ], "parameters": { - "Right": 1 + "Right": 2 }, "nullable": [ false, @@ -85,8 +95,10 @@ false, true, false, + false, + true, false ] }, - "hash": "8d144859b397a842118c2dc4ab30e74015a814ed8185b6f86fbe39e641ab804e" + "hash": "1d62f367cd14f6fa82aff8aa289e499a56832d1c90eac2f5ba06b019c3f86541" } diff --git a/.sqlx/query-026e2afeb573707cb360fcdab8f6137aabfaf603b5ed57b98ac2888b4a0389ff.json b/.sqlx/query-25fd01355c929a83daf2c802b8ae3adaa4ce73fc037e2bf2a87d60187aeb7361.json similarity index 80% rename from .sqlx/query-026e2afeb573707cb360fcdab8f6137aabfaf603b5ed57b98ac2888b4a0389ff.json rename to .sqlx/query-25fd01355c929a83daf2c802b8ae3adaa4ce73fc037e2bf2a87d60187aeb7361.json index 58175b3..6e4083e 100644 --- a/.sqlx/query-026e2afeb573707cb360fcdab8f6137aabfaf603b5ed57b98ac2888b4a0389ff.json +++ b/.sqlx/query-25fd01355c929a83daf2c802b8ae3adaa4ce73fc037e2bf2a87d60187aeb7361.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n ORDER BY r.watched_at DESC\n LIMIT ? OFFSET ?", + "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n ORDER BY r.watched_at ASC\n LIMIT ? OFFSET ?", "describe": { "columns": [ { @@ -67,6 +67,11 @@ "name": "created_at", "ordinal": 12, "type_info": "Text" + }, + { + "name": "remote_actor_url", + "ordinal": 13, + "type_info": "Text" } ], "parameters": { @@ -85,8 +90,9 @@ false, true, false, - false + false, + true ] }, - "hash": "026e2afeb573707cb360fcdab8f6137aabfaf603b5ed57b98ac2888b4a0389ff" + "hash": "25fd01355c929a83daf2c802b8ae3adaa4ce73fc037e2bf2a87d60187aeb7361" } diff --git a/.sqlx/query-a3f4385bac7f78a9959648fb325d37096c87859ded1762137ce745955f46830c.json b/.sqlx/query-4ad3b7d450e46e8d4982a0517c7345b386fc7dc6cf3e72c924e2f5da49dfb469.json similarity index 79% rename from .sqlx/query-a3f4385bac7f78a9959648fb325d37096c87859ded1762137ce745955f46830c.json rename to .sqlx/query-4ad3b7d450e46e8d4982a0517c7345b386fc7dc6cf3e72c924e2f5da49dfb469.json index 50b7c83..7b54061 100644 --- a/.sqlx/query-a3f4385bac7f78a9959648fb325d37096c87859ded1762137ce745955f46830c.json +++ b/.sqlx/query-4ad3b7d450e46e8d4982a0517c7345b386fc7dc6cf3e72c924e2f5da49dfb469.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n WHERE r.user_id = ?\n ORDER BY r.rating DESC, r.watched_at DESC\n LIMIT ? OFFSET ?", + "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n WHERE r.user_id = ?\n ORDER BY r.rating DESC, r.watched_at DESC\n LIMIT ? OFFSET ?", "describe": { "columns": [ { @@ -67,6 +67,11 @@ "name": "created_at", "ordinal": 12, "type_info": "Text" + }, + { + "name": "remote_actor_url", + "ordinal": 13, + "type_info": "Text" } ], "parameters": { @@ -85,8 +90,9 @@ false, true, false, - false + false, + true ] }, - "hash": "a3f4385bac7f78a9959648fb325d37096c87859ded1762137ce745955f46830c" + "hash": "4ad3b7d450e46e8d4982a0517c7345b386fc7dc6cf3e72c924e2f5da49dfb469" } diff --git a/.sqlx/query-5a861b5a934c9831ff17d896fa48feb95e6dab051c5ac55a66f9793482522199.json b/.sqlx/query-4c235060b78cc5c73e0400b39472467a36d3920ff4c020f5461b2cdb04361888.json similarity index 79% rename from .sqlx/query-5a861b5a934c9831ff17d896fa48feb95e6dab051c5ac55a66f9793482522199.json rename to .sqlx/query-4c235060b78cc5c73e0400b39472467a36d3920ff4c020f5461b2cdb04361888.json index eeff415..a737e06 100644 --- a/.sqlx/query-5a861b5a934c9831ff17d896fa48feb95e6dab051c5ac55a66f9793482522199.json +++ b/.sqlx/query-4c235060b78cc5c73e0400b39472467a36d3920ff4c020f5461b2cdb04361888.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n WHERE r.user_id = ?\n ORDER BY r.watched_at DESC\n LIMIT ? OFFSET ?", + "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n WHERE r.user_id = ?\n ORDER BY r.watched_at DESC\n LIMIT ? OFFSET ?", "describe": { "columns": [ { @@ -67,6 +67,11 @@ "name": "created_at", "ordinal": 12, "type_info": "Text" + }, + { + "name": "remote_actor_url", + "ordinal": 13, + "type_info": "Text" } ], "parameters": { @@ -85,8 +90,9 @@ false, true, false, - false + false, + true ] }, - "hash": "5a861b5a934c9831ff17d896fa48feb95e6dab051c5ac55a66f9793482522199" + "hash": "4c235060b78cc5c73e0400b39472467a36d3920ff4c020f5461b2cdb04361888" } diff --git a/.sqlx/query-630e092fcd33bc312befef352a98225e6e18e6079644b949258a39bf4b0fe3e5.json b/.sqlx/query-630e092fcd33bc312befef352a98225e6e18e6079644b949258a39bf4b0fe3e5.json deleted file mode 100644 index 8b41a06..0000000 --- a/.sqlx/query-630e092fcd33bc312befef352a98225e6e18e6079644b949258a39bf4b0fe3e5.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at)\n VALUES (?, ?, ?, ?, ?, ?, ?)", - "describe": { - "columns": [], - "parameters": { - "Right": 7 - }, - "nullable": [] - }, - "hash": "630e092fcd33bc312befef352a98225e6e18e6079644b949258a39bf4b0fe3e5" -} diff --git a/.sqlx/query-70ee6050284475b5641af712e5923ba2091b8b70b1885ca6518dfa4bb01fdac2.json b/.sqlx/query-7ff439f22f880f999a72aad1359eb8fec11fe868b940faee5a351795caaa2357.json similarity index 75% rename from .sqlx/query-70ee6050284475b5641af712e5923ba2091b8b70b1885ca6518dfa4bb01fdac2.json rename to .sqlx/query-7ff439f22f880f999a72aad1359eb8fec11fe868b940faee5a351795caaa2357.json index 2b9f9f8..381b73f 100644 --- a/.sqlx/query-70ee6050284475b5641af712e5923ba2091b8b70b1885ca6518dfa4bb01fdac2.json +++ b/.sqlx/query-7ff439f22f880f999a72aad1359eb8fec11fe868b940faee5a351795caaa2357.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at\n FROM reviews WHERE id = ?", + "query": "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url\n FROM reviews WHERE movie_id = ? ORDER BY watched_at ASC", "describe": { "columns": [ { @@ -37,6 +37,11 @@ "name": "created_at", "ordinal": 6, "type_info": "Text" + }, + { + "name": "remote_actor_url", + "ordinal": 7, + "type_info": "Text" } ], "parameters": { @@ -49,8 +54,9 @@ false, true, false, - false + false, + true ] }, - "hash": "70ee6050284475b5641af712e5923ba2091b8b70b1885ca6518dfa4bb01fdac2" + "hash": "7ff439f22f880f999a72aad1359eb8fec11fe868b940faee5a351795caaa2357" } diff --git a/.sqlx/query-01a08873b7fa815ad98a56a0902b60414cfcdc2c7a8570351320c4bc425347c6.json b/.sqlx/query-8a70f21c39d203867c06dc0bf74a54745b3331b84ce9a2178f7812f1ed7262cc.json similarity index 79% rename from .sqlx/query-01a08873b7fa815ad98a56a0902b60414cfcdc2c7a8570351320c4bc425347c6.json rename to .sqlx/query-8a70f21c39d203867c06dc0bf74a54745b3331b84ce9a2178f7812f1ed7262cc.json index 27ed241..965ef29 100644 --- a/.sqlx/query-01a08873b7fa815ad98a56a0902b60414cfcdc2c7a8570351320c4bc425347c6.json +++ b/.sqlx/query-8a70f21c39d203867c06dc0bf74a54745b3331b84ce9a2178f7812f1ed7262cc.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n WHERE r.movie_id = ?\n ORDER BY r.watched_at ASC\n LIMIT ? OFFSET ?", + "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n WHERE r.movie_id = ?\n ORDER BY r.watched_at ASC\n LIMIT ? OFFSET ?", "describe": { "columns": [ { @@ -67,6 +67,11 @@ "name": "created_at", "ordinal": 12, "type_info": "Text" + }, + { + "name": "remote_actor_url", + "ordinal": 13, + "type_info": "Text" } ], "parameters": { @@ -85,8 +90,9 @@ false, true, false, - false + false, + true ] }, - "hash": "01a08873b7fa815ad98a56a0902b60414cfcdc2c7a8570351320c4bc425347c6" + "hash": "8a70f21c39d203867c06dc0bf74a54745b3331b84ce9a2178f7812f1ed7262cc" } diff --git a/.sqlx/query-affe1eb261283c09d4b1ce6e684681755f079a044ffec8ff2bd79cfd8efe16b8.json b/.sqlx/query-a7c424c26663e4e51b1c563fa977f28e1d55234a242a7ddba50db13cf73b488d.json similarity index 80% rename from .sqlx/query-affe1eb261283c09d4b1ce6e684681755f079a044ffec8ff2bd79cfd8efe16b8.json rename to .sqlx/query-a7c424c26663e4e51b1c563fa977f28e1d55234a242a7ddba50db13cf73b488d.json index 65a123c..b715b4e 100644 --- a/.sqlx/query-affe1eb261283c09d4b1ce6e684681755f079a044ffec8ff2bd79cfd8efe16b8.json +++ b/.sqlx/query-a7c424c26663e4e51b1c563fa977f28e1d55234a242a7ddba50db13cf73b488d.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n ORDER BY r.watched_at ASC\n LIMIT ? OFFSET ?", + "query": "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,\n r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url\n FROM reviews r\n INNER JOIN movies m ON m.id = r.movie_id\n ORDER BY r.watched_at DESC\n LIMIT ? OFFSET ?", "describe": { "columns": [ { @@ -67,6 +67,11 @@ "name": "created_at", "ordinal": 12, "type_info": "Text" + }, + { + "name": "remote_actor_url", + "ordinal": 13, + "type_info": "Text" } ], "parameters": { @@ -85,8 +90,9 @@ false, true, false, - false + false, + true ] }, - "hash": "affe1eb261283c09d4b1ce6e684681755f079a044ffec8ff2bd79cfd8efe16b8" + "hash": "a7c424c26663e4e51b1c563fa977f28e1d55234a242a7ddba50db13cf73b488d" } diff --git a/.sqlx/query-af883f8b78f185077e2d3dcfaa0a6e62fbdfbf00c97c9b33b699dc631476181d.json b/.sqlx/query-ae983138ad90fda3794b784fbf62c31fddcf182850782e9bfbc4ff3ee8b7d4bb.json similarity index 76% rename from .sqlx/query-af883f8b78f185077e2d3dcfaa0a6e62fbdfbf00c97c9b33b699dc631476181d.json rename to .sqlx/query-ae983138ad90fda3794b784fbf62c31fddcf182850782e9bfbc4ff3ee8b7d4bb.json index 4b94231..625de0c 100644 --- a/.sqlx/query-af883f8b78f185077e2d3dcfaa0a6e62fbdfbf00c97c9b33b699dc631476181d.json +++ b/.sqlx/query-ae983138ad90fda3794b784fbf62c31fddcf182850782e9bfbc4ff3ee8b7d4bb.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at\n FROM reviews WHERE movie_id = ? ORDER BY watched_at ASC", + "query": "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url\n FROM reviews WHERE id = ?", "describe": { "columns": [ { @@ -37,6 +37,11 @@ "name": "created_at", "ordinal": 6, "type_info": "Text" + }, + { + "name": "remote_actor_url", + "ordinal": 7, + "type_info": "Text" } ], "parameters": { @@ -49,8 +54,9 @@ false, true, false, - false + false, + true ] }, - "hash": "af883f8b78f185077e2d3dcfaa0a6e62fbdfbf00c97c9b33b699dc631476181d" + "hash": "ae983138ad90fda3794b784fbf62c31fddcf182850782e9bfbc4ff3ee8b7d4bb" } diff --git a/.sqlx/query-cca022ac6275f2b1aaf63a14420897074c8ff4cdd1d3e9a13ef4b9dd5346d12a.json b/.sqlx/query-cca022ac6275f2b1aaf63a14420897074c8ff4cdd1d3e9a13ef4b9dd5346d12a.json new file mode 100644 index 0000000..a320185 --- /dev/null +++ b/.sqlx/query-cca022ac6275f2b1aaf63a14420897074c8ff4cdd1d3e9a13ef4b9dd5346d12a.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url)\n VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + "describe": { + "columns": [], + "parameters": { + "Right": 8 + }, + "nullable": [] + }, + "hash": "cca022ac6275f2b1aaf63a14420897074c8ff4cdd1d3e9a13ef4b9dd5346d12a" +} diff --git a/Cargo.lock b/Cargo.lock index e324cae..b950628 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,229 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "activitypub" +version = "0.1.0" +dependencies = [ + "activitypub_federation", + "anyhow", + "application", + "async-trait", + "axum", + "chrono", + "domain", + "enum_delegate", + "event-publisher", + "reqwest 0.13.3", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "activitypub_federation" +version = "0.7.0-beta.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20222f29f14358d3baeb0ffdec08f99bd0f56b6b59504f33556d97db720de748" +dependencies = [ + "activitystreams-kinds", + "actix-web", + "async-trait", + "axum", + "base64", + "bytes", + "chrono", + "derive_builder", + "dyn-clone", + "either", + "enum_delegate", + "futures", + "futures-core", + "http 0.2.12", + "http 1.4.0", + "http-signature-normalization", + "http-signature-normalization-reqwest", + "httpdate", + "itertools 0.14.0", + "moka", + "pin-project-lite", + "rand 0.8.6", + "regex", + "reqwest 0.13.3", + "reqwest-middleware", + "rsa", + "serde", + "serde_json", + "sha2", + "thiserror 2.0.18", + "tokio", + "tower", + "tracing", + "url", +] + +[[package]] +name = "activitystreams-kinds" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e97dfe76efd8c0b113cc3580a6b5f4acba47662e3cfbbfcce081c9ac89798990" +dependencies = [ + "serde", + "url", +] + +[[package]] +name = "actix-codec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f7b0a21988c1bf877cf4759ef5ddaac04c1c9fe808c9142ecb78ba97d97a28a" +dependencies = [ + "bitflags 2.11.1", + "bytes", + "futures-core", + "futures-sink", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "actix-http" +version = "3.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93acb4a42f64936f9b8cae4a433b237599dd6eb6ed06124eb67132ef8cc90662" +dependencies = [ + "actix-codec", + "actix-rt", + "actix-service", + "actix-utils", + "bitflags 2.11.1", + "bytes", + "bytestring", + "derive_more", + "encoding_rs", + "foldhash 0.1.5", + "futures-core", + "http 0.2.12", + "httparse", + "httpdate", + "itoa", + "language-tags", + "mime", + "percent-encoding", + "pin-project-lite", + "smallvec", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "actix-router" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f8c75c51892f18d9c46150c5ac7beb81c95f78c8b83a634d49f4ca32551fe7" +dependencies = [ + "bytestring", + "cfg-if", + "http 0.2.12", + "regex-lite", + "serde", + "tracing", +] + +[[package]] +name = "actix-rt" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92589714878ca59a7626ea19734f0e07a6a875197eec751bb5d3f99e64998c63" +dependencies = [ + "futures-core", + "tokio", +] + +[[package]] +name = "actix-server" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a65064ea4a457eaf07f2fba30b4c695bf43b721790e9530d26cb6f9019ff7502" +dependencies = [ + "actix-rt", + "actix-service", + "actix-utils", + "futures-core", + "futures-util", + "mio", + "socket2 0.5.10", + "tokio", + "tracing", +] + +[[package]] +name = "actix-service" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e46f36bf0e5af44bdc4bdb36fbbd421aa98c79a9bce724e1edeb3894e10dc7f" +dependencies = [ + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "actix-utils" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88a1dcdff1466e3c2488e1cb5c36a71822750ad43839937f85d2f4d9f8b705d8" +dependencies = [ + "local-waker", + "pin-project-lite", +] + +[[package]] +name = "actix-web" +version = "4.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff87453bc3b56e9b2b23c1cc0b1be8797184accf51d2abe0f8a33ec275d316bf" +dependencies = [ + "actix-codec", + "actix-http", + "actix-router", + "actix-rt", + "actix-server", + "actix-service", + "actix-utils", + "bytes", + "bytestring", + "cfg-if", + "derive_more", + "encoding_rs", + "foldhash 0.1.5", + "futures-core", + "futures-util", + "impl-more", + "itoa", + "language-tags", + "log", + "mime", + "once_cell", + "pin-project-lite", + "regex-lite", + "serde", + "serde_json", + "serde_urlencoded", + "smallvec", + "socket2 0.6.3", + "time", + "tracing", + "url", +] + [[package]] name = "aes" version = "0.8.4" @@ -353,7 +576,7 @@ dependencies = [ "bytes", "form_urlencoded", "futures-util", - "http", + "http 1.4.0", "http-body", "http-body-util", "hyper", @@ -384,7 +607,7 @@ checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" dependencies = [ "bytes", "futures-core", - "http", + "http 1.4.0", "http-body", "http-body-util", "mime", @@ -521,6 +744,15 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "bytestring" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86566c496f2f47d9b8147a4c8b02ffdb69c919fe0c2b2e7195d22cbba0e635c9" +dependencies = [ + "bytes", +] + [[package]] name = "castaway" version = "0.2.4" @@ -705,6 +937,24 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "217698eaf96b4a3f0bc4f3662aaa55bdf913cd54d7204591faa790070c6d0853" +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -934,6 +1184,7 @@ dependencies = [ "quote", "rustc_version", "syn 2.0.117", + "unicode-xid", ] [[package]] @@ -1022,6 +1273,12 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "either" version = "1.15.0" @@ -1055,6 +1312,30 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66b7e2430c6dff6a955451e2cfc438f09cea1965a9d6f87f7e3b90decc014099" +[[package]] +name = "enum_delegate" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8ea75f31022cba043afe037940d73684327e915f88f62478e778c3de914cd0a" +dependencies = [ + "enum_delegate_lib", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "enum_delegate_lib" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e1f6c3800b304a6be0012039e2a45a322a093539c45ab818d9e6895a39c90fe" +dependencies = [ + "proc-macro2", + "quote", + "rand 0.8.6", + "syn 1.0.109", +] + [[package]] name = "enumflags2" version = "0.7.12" @@ -1411,7 +1692,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http", + "http 1.4.0", "indexmap", "slab", "tokio", @@ -1501,6 +1782,17 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.4.0" @@ -1518,7 +1810,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.4.0", ] [[package]] @@ -1529,7 +1821,7 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http", + "http 1.4.0", "http-body", "pin-project-lite", ] @@ -1540,6 +1832,31 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9171a2ea8a68358193d15dd5d70c1c10a2afc3e7e4c5bc92bc9f025cebd7359c" +[[package]] +name = "http-signature-normalization" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b95e3149194de5f3f9d5225bcc6a8677979f8ff8ce39c85654730ad4824f101e" +dependencies = [ + "httpdate", +] + +[[package]] +name = "http-signature-normalization-reqwest" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2441a67ea8984d46c95099b4a9be83dc5bed2c254b8443dc2554edaeaa7d0b" +dependencies = [ + "async-trait", + "base64", + "http-signature-normalization", + "httpdate", + "reqwest 0.13.3", + "reqwest-middleware", + "sha2", + "tokio", +] + [[package]] name = "httparse" version = "1.10.1" @@ -1569,7 +1886,7 @@ dependencies = [ "futures-channel", "futures-core", "h2", - "http", + "http 1.4.0", "http-body", "httparse", "httpdate", @@ -1586,7 +1903,7 @@ version = "0.27.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ca68d021ef39cf6463ab54c1d0f5daf03377b70561305bb89a8f83aab66e0f" dependencies = [ - "http", + "http 1.4.0", "hyper", "hyper-util", "rustls", @@ -1606,14 +1923,14 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http", + "http 1.4.0", "http-body", "hyper", "ipnet", "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.6.3", "system-configuration", "tokio", "tower-service", @@ -1760,6 +2077,12 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "impl-more" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a5a9a0ff0086c7a148acb942baaabeadf9504d10400b5a05645853729b9cd2" + [[package]] name = "indexmap" version = "2.14.0" @@ -1965,6 +2288,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf36173d4167ed999940f804952e6b08197cae5ad5d572eb4db150ce8ad5d58f" +[[package]] +name = "language-tags" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388" + [[package]] name = "lazy_static" version = "1.5.0" @@ -2042,6 +2371,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092" +[[package]] +name = "local-waker" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d873d7c67ce09b42110d801813efbc9364414e356be9935700d368351657487" + [[package]] name = "lock_api" version = "0.4.14" @@ -2172,6 +2507,26 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "moka" +version = "0.12.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "957228ad12042ee839f93c8f257b62b4c0ab5eaae1d4fa60de53b27c9d7c5046" +dependencies = [ + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "equivalent", + "event-listener", + "futures-util", + "parking_lot", + "portable-atomic", + "smallvec", + "tagptr", + "uuid", +] + [[package]] name = "never" version = "0.1.0" @@ -2675,6 +3030,7 @@ dependencies = [ name = "presentation" version = "0.1.0" dependencies = [ + "activitypub", "anyhow", "application", "async-trait", @@ -2687,6 +3043,7 @@ dependencies = [ "http-body-util", "infer", "metadata", + "percent-encoding", "poster-fetcher", "poster-storage", "rss 0.1.0", @@ -2756,7 +3113,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2", + "socket2 0.6.3", "thiserror 2.0.18", "tokio", "tracing", @@ -2794,7 +3151,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2", + "socket2 0.6.3", "tracing", "windows-sys 0.59.0", ] @@ -3016,6 +3373,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-lite" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cab834c73d247e67f4fae452806d17d3c7501756d98c8808d7c9c7aa7d18f973" + [[package]] name = "regex-syntax" version = "0.8.10" @@ -3033,7 +3396,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", + "http 1.4.0", "http-body", "http-body-util", "hyper", @@ -3060,7 +3423,7 @@ dependencies = [ "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams", + "wasm-streams 0.4.2", "web-sys", ] @@ -3074,8 +3437,9 @@ dependencies = [ "bytes", "encoding_rs", "futures-core", + "futures-util", "h2", - "http", + "http 1.4.0", "http-body", "http-body-util", "hyper", @@ -3096,15 +3460,31 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-rustls", + "tokio-util", "tower", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams 0.5.0", "web-sys", ] +[[package]] +name = "reqwest-middleware" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "199dda04a536b532d0cc04d7979e39b1c763ea749bf91507017069c00b96056f" +dependencies = [ + "anyhow", + "async-trait", + "http 1.4.0", + "reqwest 0.13.3", + "thiserror 2.0.18", + "tower-service", +] + [[package]] name = "ring" version = "0.17.14" @@ -3385,6 +3765,7 @@ version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ + "indexmap", "itoa", "memchr", "serde", @@ -3437,6 +3818,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.9" @@ -3574,6 +3961,16 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "socket2" version = "0.6.3" @@ -3607,6 +4004,8 @@ dependencies = [ name = "sqlite" version = "0.1.0" dependencies = [ + "activitypub", + "anyhow", "async-trait", "chrono", "domain", @@ -3929,6 +4328,12 @@ dependencies = [ "libc", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tempfile" version = "3.27.0" @@ -4136,7 +4541,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.6.3", "tokio-macros", "tracing", "windows-sys 0.61.2", @@ -4243,7 +4648,7 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http", + "http 1.4.0", "http-body", "http-body-util", "http-range-header", @@ -4463,6 +4868,7 @@ dependencies = [ "idna", "percent-encoding", "serde", + "serde_derive", ] [[package]] @@ -4487,6 +4893,7 @@ dependencies = [ "getrandom 0.4.2", "js-sys", "serde_core", + "sha1_smol", "wasm-bindgen", ] @@ -4656,6 +5063,19 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasm-streams" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d1ec4f6517c9e11ae630e200b2b65d193279042e28edd4a2cda233e46670bbb" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.244.0" diff --git a/Cargo.toml b/Cargo.toml index 37a8dc8..dd0631e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,9 +8,11 @@ members = [ "crates/adapters/rss", "crates/adapters/sqlite", "crates/adapters/template-askama", + "crates/adapters/activitypub", "crates/application", "crates/domain", - "crates/presentation", "crates/tui", + "crates/presentation", + "crates/tui", ] resolver = "2" @@ -24,7 +26,7 @@ thiserror = "2.0" tracing = "0.1" tracing-subscriber = { version = "0.3.23", features = ["env-filter"] } async-trait = "0.1" -uuid = { version = "1.23.0", features = ["v4", "serde"] } +uuid = { version = "1.23.0", features = ["v4", "v5", "serde"] } chrono = { version = "0.4", features = ["serde"] } sqlx = { version = "0.8.6", features = [ "runtime-tokio-rustls", @@ -46,3 +48,4 @@ event-publisher = { path = "crates/adapters/event-publisher" } rss = { path = "crates/adapters/rss" } sqlite = { path = "crates/adapters/sqlite" } template-askama = { path = "crates/adapters/template-askama" } +activitypub = { path = "crates/adapters/activitypub" } diff --git a/crates/adapters/activitypub/Cargo.toml b/crates/adapters/activitypub/Cargo.toml new file mode 100644 index 0000000..ca2a57a --- /dev/null +++ b/crates/adapters/activitypub/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "activitypub" +version = "0.1.0" +edition = "2024" + +[dependencies] +domain = { workspace = true } +application = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +reqwest = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +thiserror = { workspace = true } +anyhow = { workspace = true } +tracing = { workspace = true } +async-trait = { workspace = true } +event-publisher = { workspace = true } + +activitypub_federation = "0.7.0-beta.11" +url = { version = "2", features = ["serde"] } +enum_delegate = "0.2" +axum = "0.8" diff --git a/crates/adapters/activitypub/src/activities.rs b/crates/adapters/activitypub/src/activities.rs new file mode 100644 index 0000000..da9ef8e --- /dev/null +++ b/crates/adapters/activitypub/src/activities.rs @@ -0,0 +1,314 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + kinds::activity::{AcceptType, CreateType, DeleteType, FollowType, RejectType, UndoType}, + traits::{Activity, Actor, Object}, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; +use crate::objects::{DbReview, ReviewObject}; +use crate::repository::FollowerStatus; + +// --- Follow --- + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct FollowActivity { + pub(crate) id: Url, + #[serde(rename = "type")] + pub(crate) kind: FollowType, + pub(crate) actor: ObjectId, + pub(crate) object: ObjectId, +} + +#[async_trait::async_trait] +impl Activity for FollowActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, data: &Data) -> Result<(), Self::Error> { + // Verify the target is a local actor + let target_url = self.object.inner(); + if target_url.domain() != Some(&data.domain) { + return Err(Error(anyhow::anyhow!( + "follow target is not a local actor" + ))); + } + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + let follower = self.actor.dereference(data).await?; + let local_actor = self.object.dereference(data).await?; + + data.federation_repo + .add_follower( + local_actor.user_id.clone(), + self.actor.inner().as_str(), + FollowerStatus::Accepted, + ) + .await?; + + // Send Accept back + let accept_id = + Url::parse(&format!("{}/activities/{}", data.base_url, uuid::Uuid::new_v4())) + .expect("valid url"); + let accept = AcceptActivity { + id: accept_id, + kind: Default::default(), + actor: self.object.clone(), + object: self.clone(), + }; + + use activitypub_federation::activity_sending::SendActivityTask; + use activitypub_federation::protocol::context::WithContext; + + let accept_with_ctx = WithContext::new_default(accept); + let sends = + SendActivityTask::prepare(&accept_with_ctx, &local_actor, vec![follower.inbox()], data) + .await?; + for send in sends { + send.sign_and_send(data).await?; + } + + tracing::info!( + follower = %self.actor.inner(), + local_user = %local_actor.user_id.value(), + "accepted follow" + ); + + Ok(()) + } +} + +// --- Accept --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AcceptActivity { + pub(crate) id: Url, + #[serde(rename = "type")] + pub(crate) kind: AcceptType, + pub(crate) actor: ObjectId, + pub(crate) object: FollowActivity, +} + +#[async_trait::async_trait] +impl Activity for AcceptActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, _data: &Data) -> Result<(), Self::Error> { + let remote_actor_url = self.actor.into_inner().to_string(); + tracing::info!(remote_actor_url = %remote_actor_url, "Follow accepted by remote instance"); + // TODO(ap): update ap_following to track accepted status + Ok(()) + } +} + +// --- Reject --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct RejectActivity { + pub(crate) id: Url, + #[serde(rename = "type")] + pub(crate) kind: RejectType, + pub(crate) actor: ObjectId, + pub(crate) object: FollowActivity, +} + +#[async_trait::async_trait] +impl Activity for RejectActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + // The actor rejected our follow. Extract the local user from the original Follow's actor. + let local_user_url = self.object.actor.inner(); + let path = local_user_url.path(); + if let Some(uid_str) = path.strip_prefix("/users/").and_then(|s| s.split('/').next()) { + if let Ok(uuid) = uuid::Uuid::parse_str(uid_str) { + let user_id = domain::value_objects::UserId::from_uuid(uuid); + data.federation_repo + .remove_following(user_id, self.actor.inner().as_str()) + .await?; + } + } + tracing::info!(actor = %self.actor.inner(), "follow rejected"); + Ok(()) + } +} + +// --- Undo --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UndoActivity { + pub(crate) id: Url, + #[serde(rename = "type")] + pub(crate) kind: UndoType, + pub(crate) actor: ObjectId, + pub(crate) object: FollowActivity, +} + +#[async_trait::async_trait] +impl Activity for UndoActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + // Remote actor is unfollowing a local user + let local_user_url = self.object.object.inner(); + let path = local_user_url.path(); + if let Some(uid_str) = path.strip_prefix("/users/").and_then(|s| s.split('/').next()) { + if let Ok(uuid) = uuid::Uuid::parse_str(uid_str) { + let user_id = domain::value_objects::UserId::from_uuid(uuid); + data.federation_repo + .remove_follower(user_id, self.actor.inner().as_str()) + .await?; + } + } + tracing::info!(actor = %self.actor.inner(), "unfollowed"); + Ok(()) + } +} + +// --- Create --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateActivity { + pub(crate) id: Url, + #[serde(rename = "type")] + pub(crate) kind: CreateType, + pub(crate) actor: ObjectId, + pub(crate) object: ReviewObject, +} + +#[async_trait::async_trait] +impl Activity for CreateActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + DbReview::from_json(self.object, data).await?; + tracing::info!(actor = %self.actor.inner(), "received review"); + Ok(()) + } +} + +// --- Delete --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DeleteActivity { + pub(crate) id: Url, + #[serde(rename = "type")] + pub(crate) kind: DeleteType, + pub(crate) actor: ObjectId, + pub(crate) object: Url, +} + +#[async_trait::async_trait] +impl Activity for DeleteActivity { + type DataType = FederationData; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, _data: &Data) -> Result<(), Self::Error> { + tracing::info!(actor = %self.actor.inner(), object = %self.object, "delete received (no-op)"); + Ok(()) + } +} + +// --- Inbox dispatch enum --- + +#[derive(Debug, Deserialize, Serialize)] +#[serde(tag = "type")] +#[enum_delegate::implement(Activity)] +pub enum InboxActivities { + #[serde(rename = "Follow")] + Follow(FollowActivity), + #[serde(rename = "Accept")] + Accept(AcceptActivity), + #[serde(rename = "Reject")] + Reject(RejectActivity), + #[serde(rename = "Undo")] + Undo(UndoActivity), + #[serde(rename = "Create")] + Create(CreateActivity), + #[serde(rename = "Delete")] + Delete(DeleteActivity), +} diff --git a/crates/adapters/activitypub/src/actor_handler.rs b/crates/adapters/activitypub/src/actor_handler.rs new file mode 100644 index 0000000..c244372 --- /dev/null +++ b/crates/adapters/activitypub/src/actor_handler.rs @@ -0,0 +1,24 @@ +use activitypub_federation::{ + axum::json::FederationJson, config::Data, protocol::context::WithContext, traits::Object, +}; +use axum::extract::Path; + +use domain::value_objects::UserId; + +use crate::actors::{get_local_actor, Person}; +use crate::data::FederationData; +use crate::error::Error; + +pub async fn actor_handler( + Path(user_id_str): Path, + data: Data, +) -> Result>, Error> { + let uuid = uuid::Uuid::parse_str(&user_id_str) + .map_err(|_| Error(anyhow::anyhow!("invalid user id")))?; + let user_id = UserId::from_uuid(uuid); + + let db_actor = get_local_actor(user_id, &data).await?; + let person = db_actor.into_json(&data).await?; + + Ok(FederationJson(WithContext::new_default(person))) +} diff --git a/crates/adapters/activitypub/src/actors.rs b/crates/adapters/activitypub/src/actors.rs new file mode 100644 index 0000000..10f3fc3 --- /dev/null +++ b/crates/adapters/activitypub/src/actors.rs @@ -0,0 +1,258 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + http_signatures::generate_actor_keypair, + kinds::actor::PersonType, + protocol::{public_key::PublicKey, verification::verify_domains_match}, + traits::{Actor, Object}, +}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use domain::value_objects::UserId; + +use crate::data::FederationData; +use crate::error::Error; +use crate::repository::RemoteActor; + +#[derive(Debug, Clone)] +pub struct DbActor { + pub user_id: UserId, + pub email: String, + pub public_key_pem: String, + pub private_key_pem: Option, + pub inbox_url: Url, + pub outbox_url: Url, + pub followers_url: Url, + pub following_url: Url, + pub ap_id: Url, + pub last_refreshed_at: DateTime, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Person { + #[serde(rename = "type")] + kind: PersonType, + id: ObjectId, + preferred_username: String, + inbox: Url, + outbox: Url, + followers: Url, + following: Url, + public_key: PublicKey, + name: Option, +} + +pub fn actor_url(base_url: &str, user_id: &UserId) -> Url { + Url::parse(&format!("{}/users/{}", base_url, user_id.value())).expect("valid actor url") +} + +pub async fn get_local_actor( + user_id: UserId, + data: &Data, +) -> Result { + let user = data + .user_repo + .find_by_id(&user_id) + .await + .map_err(|e| Error(e.into()))? + .ok_or_else(|| Error(anyhow::anyhow!("user not found: {}", user_id.value())))?; + + let (public_key, private_key) = match data + .federation_repo + .get_local_actor_keypair(user_id.clone()) + .await? + { + Some(kp) => kp, + None => { + let kp = generate_actor_keypair()?; + data.federation_repo + .save_local_actor_keypair( + user_id.clone(), + kp.public_key.clone(), + kp.private_key.clone(), + ) + .await?; + (kp.public_key, kp.private_key) + } + }; + + let ap_id = actor_url(&data.base_url, user.id()); + let inbox_url = Url::parse(&format!("{}/inbox", &ap_id)).expect("valid inbox url"); + let outbox_url = Url::parse(&format!("{}/outbox", &ap_id)).expect("valid outbox url"); + let followers_url = Url::parse(&format!("{}/followers", &ap_id)).expect("valid followers url"); + let following_url = Url::parse(&format!("{}/following", &ap_id)).expect("valid following url"); + + Ok(DbActor { + user_id: user.id().clone(), + email: user.email().value().to_string(), + public_key_pem: public_key, + private_key_pem: Some(private_key), + inbox_url, + outbox_url, + followers_url, + following_url, + ap_id, + last_refreshed_at: Utc::now(), + }) +} + +#[async_trait::async_trait] +impl Object for DbActor { + type DataType = FederationData; + type Kind = Person; + type Error = Error; + + fn id(&self) -> &Url { + &self.ap_id + } + + fn last_refreshed_at(&self) -> Option> { + Some(self.last_refreshed_at) + } + + async fn read_from_id( + object_id: Url, + data: &Data, + ) -> Result, Self::Error> { + // Extract user_id from URL path: /users/{uuid} + let path = object_id.path(); + let user_id_str = path + .strip_prefix("/users/") + .and_then(|s| s.split('/').next()); + + let user_id_str = match user_id_str { + Some(s) => s, + None => return Ok(None), + }; + + let uuid = match uuid::Uuid::parse_str(user_id_str) { + Ok(u) => u, + Err(_) => return Ok(None), + }; + + let user_id = UserId::from_uuid(uuid); + let user = match data.user_repo.find_by_id(&user_id).await { + Ok(Some(u)) => u, + _ => return Ok(None), + }; + + let keypair = data + .federation_repo + .get_local_actor_keypair(user_id.clone()) + .await?; + + let (public_key, private_key) = match keypair { + Some(kp) => (kp.0, Some(kp.1)), + None => return Ok(None), + }; + + let ap_id = actor_url(&data.base_url, user.id()); + let inbox_url = Url::parse(&format!("{}/inbox", &ap_id)).expect("valid url"); + let outbox_url = Url::parse(&format!("{}/outbox", &ap_id)).expect("valid url"); + let followers_url = Url::parse(&format!("{}/followers", &ap_id)).expect("valid url"); + let following_url = Url::parse(&format!("{}/following", &ap_id)).expect("valid url"); + + Ok(Some(DbActor { + user_id: user.id().clone(), + email: user.email().value().to_string(), + public_key_pem: public_key, + private_key_pem: private_key, + inbox_url, + outbox_url, + followers_url, + following_url, + ap_id, + last_refreshed_at: Utc::now(), + })) + } + + async fn into_json(self, _data: &Data) -> Result { + let public_key = PublicKey { + id: format!("{}#main-key", &self.ap_id), + owner: self.ap_id.clone(), + public_key_pem: self.public_key_pem.clone(), + }; + + Ok(Person { + kind: Default::default(), + id: self.ap_id.clone().into(), + preferred_username: self + .email + .split('@') + .next() + .unwrap_or(&self.email) + .to_string(), + inbox: self.inbox_url.clone(), + outbox: self.outbox_url.clone(), + followers: self.followers_url.clone(), + following: self.following_url.clone(), + public_key, + name: Some(self.email.clone()), + }) + } + + async fn verify( + json: &Self::Kind, + expected_domain: &Url, + _data: &Data, + ) -> Result<(), Self::Error> { + verify_domains_match(json.id.inner(), expected_domain)?; + Ok(()) + } + + async fn from_json( + json: Self::Kind, + data: &Data, + ) -> Result { + let actor = RemoteActor { + url: json.id.inner().to_string(), + handle: json.preferred_username.clone(), + inbox_url: json.inbox.to_string(), + shared_inbox_url: None, + display_name: json.name.clone(), + }; + data.federation_repo.upsert_remote_actor(actor).await?; + + // Deterministic UUID from actor URL so the same remote actor always maps to the same UserId + let url_str = json.id.inner().to_string(); + let stable_id = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, url_str.as_bytes()); + let user_id = UserId::from_uuid(stable_id); + let ap_id = json.id.inner().clone(); + let inbox_url = json.inbox.clone(); + let outbox_url = json.outbox.clone(); + let followers_url = json.followers.clone(); + let following_url = json.following.clone(); + + Ok(DbActor { + user_id, + email: json + .name + .unwrap_or_else(|| json.preferred_username.clone()), + public_key_pem: json.public_key.public_key_pem, + private_key_pem: None, + inbox_url, + outbox_url, + followers_url, + following_url, + ap_id, + last_refreshed_at: Utc::now(), + }) + } +} + +impl Actor for DbActor { + fn public_key_pem(&self) -> &str { + &self.public_key_pem + } + + fn private_key_pem(&self) -> Option { + self.private_key_pem.clone() + } + + fn inbox(&self) -> Url { + self.inbox_url.clone() + } +} diff --git a/crates/adapters/activitypub/src/data.rs b/crates/adapters/activitypub/src/data.rs new file mode 100644 index 0000000..c519d67 --- /dev/null +++ b/crates/adapters/activitypub/src/data.rs @@ -0,0 +1,35 @@ +use std::sync::Arc; + +use domain::ports::UserRepository; + +use crate::repository::FederationRepository; + +#[derive(Clone)] +pub struct FederationData { + pub(crate) federation_repo: Arc, + pub(crate) user_repo: Arc, + pub(crate) base_url: String, + pub(crate) domain: String, +} + +impl FederationData { + pub fn new( + federation_repo: Arc, + user_repo: Arc, + base_url: String, + ) -> Self { + let domain = base_url + .trim_start_matches("https://") + .trim_start_matches("http://") + .split('/') + .next() + .unwrap_or("") + .to_string(); + Self { + federation_repo, + user_repo, + base_url, + domain, + } + } +} diff --git a/crates/adapters/activitypub/src/error.rs b/crates/adapters/activitypub/src/error.rs new file mode 100644 index 0000000..058692c --- /dev/null +++ b/crates/adapters/activitypub/src/error.rs @@ -0,0 +1,36 @@ +use std::fmt::{Display, Formatter}; + +#[derive(Debug)] +pub struct Error(pub(crate) anyhow::Error); + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) + } +} + +impl From for Error +where + T: Into, +{ + fn from(t: T) -> Self { + Error(t.into()) + } +} + +impl axum::response::IntoResponse for Error { + fn into_response(self) -> axum::response::Response { + let msg = self.0.to_string(); + let status = if msg.contains("not found") { + tracing::debug!(error = %msg, "AP: not found"); + (axum::http::StatusCode::NOT_FOUND, "Not found") + } else if msg.contains("invalid") || msg.contains("bad") { + tracing::debug!(error = %msg, "AP: bad request"); + (axum::http::StatusCode::BAD_REQUEST, "Bad request") + } else { + tracing::error!(error = %msg, "AP: internal error"); + (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Internal server error") + }; + status.into_response() + } +} diff --git a/crates/adapters/activitypub/src/event_handler.rs b/crates/adapters/activitypub/src/event_handler.rs new file mode 100644 index 0000000..13f7109 --- /dev/null +++ b/crates/adapters/activitypub/src/event_handler.rs @@ -0,0 +1,127 @@ +use activitypub_federation::{ + activity_sending::SendActivityTask, + fetch::object_id::ObjectId, + protocol::context::WithContext, +}; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use domain::{ + errors::DomainError, + events::DomainEvent, + value_objects::{ReviewId, UserId}, +}; +use event_publisher::EventHandler; +use url::Url; + +use crate::{ + activities::CreateActivity, + actors::{actor_url, get_local_actor}, + federation::ApFederationConfig, + objects::{review_url, ReviewObject}, + repository::FollowerStatus, +}; + +pub struct ActivityPubEventHandler { + federation_config: ApFederationConfig, + base_url: String, +} + +impl ActivityPubEventHandler { + pub fn new(federation_config: ApFederationConfig, base_url: String) -> Self { + Self { + federation_config, + base_url, + } + } +} + +#[async_trait] +impl EventHandler for ActivityPubEventHandler { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + match event { + DomainEvent::ReviewLogged { + review_id, + user_id, + rating, + watched_at, + .. + } => self + .on_review_logged(user_id, review_id, rating.value(), *watched_at) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string())), + _ => Ok(()), + } + } +} + +impl ActivityPubEventHandler { + async fn on_review_logged( + &self, + user_id: &UserId, + review_id: &ReviewId, + rating: u8, + watched_at: chrono::NaiveDateTime, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + + let followers = data.federation_repo.get_followers(user_id.clone()).await?; + let accepted: Vec<_> = followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .collect(); + + if accepted.is_empty() { + return Ok(()); + } + + let local_actor = get_local_actor(user_id.clone(), &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let review_id_url = review_url(&self.base_url, review_id); + let actor_id = actor_url(&self.base_url, user_id); + let activity_id = Url::parse(&format!( + "{}/activities/{}", + self.base_url, + uuid::Uuid::new_v4() + ))?; + + let stars = "\u{2B50}".repeat(rating as usize); + let now = DateTime::from_naive_utc_and_offset(watched_at, Utc); + + let object = ReviewObject { + kind: "Review".to_string(), + id: review_id_url.into(), + attributed_to: actor_id.into(), + content: format!("{} (movie review)", stars), + published: Utc::now(), + movie_title: "Unknown".to_string(), // TODO: fetch from MovieRepository + rating, + comment: None, + watched_at: now, + }; + + let create = CreateActivity { + id: activity_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object, + }; + let create_with_ctx = WithContext::new_default(create); + + let inboxes: Vec = accepted + .iter() + .filter_map(|f| Url::parse(&f.actor.inbox_url).ok()) + .collect(); + + let sends = + SendActivityTask::prepare(&create_with_ctx, &local_actor, inboxes, &data).await?; + for send in sends { + if let Err(e) = send.sign_and_send(&data).await { + tracing::warn!(error = %e, "failed to deliver activity to follower"); + } + } + + Ok(()) + } +} diff --git a/crates/adapters/activitypub/src/federation.rs b/crates/adapters/activitypub/src/federation.rs new file mode 100644 index 0000000..2e6635e --- /dev/null +++ b/crates/adapters/activitypub/src/federation.rs @@ -0,0 +1,26 @@ +use activitypub_federation::config::{Data, FederationConfig, FederationMiddleware}; + +use crate::data::FederationData; + +#[derive(Clone)] +pub struct ApFederationConfig(pub FederationConfig); + +impl ApFederationConfig { + pub async fn new(data: FederationData, debug: bool) -> anyhow::Result { + let config = FederationConfig::builder() + .domain(&data.domain) + .app_data(data) + .debug(debug) + .build() + .await?; + Ok(Self(config)) + } + + pub fn to_request_data(&self) -> Data { + self.0.to_request_data() + } + + pub fn middleware(&self) -> FederationMiddleware { + FederationMiddleware::new(self.0.clone()) + } +} diff --git a/crates/adapters/activitypub/src/followers_handler.rs b/crates/adapters/activitypub/src/followers_handler.rs new file mode 100644 index 0000000..8def073 --- /dev/null +++ b/crates/adapters/activitypub/src/followers_handler.rs @@ -0,0 +1,62 @@ +use activitypub_federation::{axum::json::FederationJson, config::Data}; +use axum::extract::Path; +use serde_json::json; + +use domain::value_objects::UserId; + +use crate::data::FederationData; +use crate::error::Error; + +pub async fn followers_handler( + Path(user_id_str): Path, + data: Data, +) -> Result, Error> { + let user_id = UserId::from_uuid( + uuid::Uuid::parse_str(&user_id_str) + .map_err(|_| Error(anyhow::anyhow!("invalid user id")))?, + ); + + // verify user exists + data.user_repo + .find_by_id(&user_id) + .await + .map_err(|e| Error(e.into()))? + .ok_or_else(|| Error(anyhow::anyhow!("user not found")))?; + + let id = format!("{}/users/{}/followers", data.base_url, user_id_str); + // TODO(ap): implement pagination + Ok(FederationJson(json!({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollection", + "id": id, + "totalItems": 0, + "orderedItems": [] + }))) +} + +pub async fn following_handler( + Path(user_id_str): Path, + data: Data, +) -> Result, Error> { + let user_id = UserId::from_uuid( + uuid::Uuid::parse_str(&user_id_str) + .map_err(|_| Error(anyhow::anyhow!("invalid user id")))?, + ); + + // verify user exists + data.user_repo + .find_by_id(&user_id) + .await + .map_err(|e| Error(e.into()))? + .ok_or_else(|| Error(anyhow::anyhow!("user not found")))?; + + let id = format!("{}/users/{}/following", data.base_url, user_id_str); + // TODO(ap): implement pagination + Ok(FederationJson(json!({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollection", + "id": id, + "totalItems": 0, + "orderedItems": [] + }))) +} diff --git a/crates/adapters/activitypub/src/inbox.rs b/crates/adapters/activitypub/src/inbox.rs new file mode 100644 index 0000000..acee339 --- /dev/null +++ b/crates/adapters/activitypub/src/inbox.rs @@ -0,0 +1,20 @@ +use activitypub_federation::{ + axum::inbox::{receive_activity, ActivityData}, + config::Data, + protocol::context::WithContext, +}; + +use crate::activities::InboxActivities; +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; + +pub async fn inbox_handler( + data: Data, + activity_data: ActivityData, +) -> Result<(), Error> { + receive_activity::, DbActor, FederationData>( + activity_data, &data, + ) + .await +} diff --git a/crates/adapters/activitypub/src/lib.rs b/crates/adapters/activitypub/src/lib.rs new file mode 100644 index 0000000..842b225 --- /dev/null +++ b/crates/adapters/activitypub/src/lib.rs @@ -0,0 +1,21 @@ +pub mod activities; +pub mod actor_handler; +pub mod actors; +pub mod data; +pub mod error; +pub mod event_handler; +pub mod federation; +pub mod followers_handler; +pub mod inbox; +pub mod objects; +pub mod outbox; +pub mod repository; +pub mod service; +pub mod webfinger; + +pub use data::FederationData; +pub use error::Error; +pub use event_handler::ActivityPubEventHandler; +pub use federation::ApFederationConfig; +pub use repository::{FederationRepository, Follower, FollowerStatus, RemoteActor}; +pub use service::ActivityPubService; diff --git a/crates/adapters/activitypub/src/objects.rs b/crates/adapters/activitypub/src/objects.rs new file mode 100644 index 0000000..3f5bd3c --- /dev/null +++ b/crates/adapters/activitypub/src/objects.rs @@ -0,0 +1,137 @@ +use activitypub_federation::{ + config::Data, + fetch::object_id::ObjectId, + protocol::verification::verify_domains_match, + traits::Object, +}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use domain::models::{Review, ReviewSource}; +use domain::value_objects::ReviewId; + +use crate::actors::DbActor; +use crate::data::FederationData; +use crate::error::Error; + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ReviewObject { + #[serde(rename = "type")] + pub(crate) kind: String, + pub(crate) id: ObjectId, + pub(crate) attributed_to: ObjectId, + pub(crate) content: String, + pub(crate) published: DateTime, + pub(crate) movie_title: String, + pub(crate) rating: u8, + pub(crate) comment: Option, + pub(crate) watched_at: DateTime, +} + +#[derive(Debug, Clone)] +pub struct DbReview { + pub review: Review, + pub ap_id: Url, +} + +pub fn review_url(base_url: &str, review_id: &ReviewId) -> Url { + Url::parse(&format!("{}/reviews/{}", base_url, review_id.value())).expect("valid review url") +} + +#[async_trait::async_trait] +impl Object for DbReview { + type DataType = FederationData; + type Kind = ReviewObject; + type Error = Error; + + fn id(&self) -> &Url { + &self.ap_id + } + + async fn read_from_id( + _object_id: Url, + _data: &Data, + ) -> Result, Self::Error> { + // Incoming activities provide the full object; no need to dereference local reviews + Ok(None) + } + + async fn into_json(self, data: &Data) -> Result { + let r = &self.review; + let ap_id = review_url(&data.base_url, r.id()); + let actor_url = crate::actors::actor_url(&data.base_url, r.user_id()); + + let stars: String = "\u{2B50}".repeat(r.rating().value() as usize); + let comment_text = r.comment().map(|c| c.value().to_string()); + // TODO(ap): fetch movie title from MovieRepository via FederationData + let movie_title = "Unknown".to_string(); + + let fallback = match &comment_text { + Some(c) => format!("{} Watched '{}': {}", stars, movie_title, c), + None => format!("{} Watched '{}'", stars, movie_title), + }; + + Ok(ReviewObject { + kind: "Review".to_string(), + id: ap_id.into(), + attributed_to: actor_url.into(), + content: fallback, + published: DateTime::from_naive_utc_and_offset(*r.created_at(), Utc), + movie_title, + rating: r.rating().value(), + comment: comment_text, + watched_at: DateTime::from_naive_utc_and_offset(*r.watched_at(), Utc), + }) + } + + async fn verify( + json: &Self::Kind, + expected_domain: &Url, + _data: &Data, + ) -> Result<(), Self::Error> { + verify_domains_match(json.attributed_to.inner(), expected_domain)?; + Ok(()) + } + + async fn from_json( + json: Self::Kind, + data: &Data, + ) -> Result { + let actor_url = json.attributed_to.inner().to_string(); + + let review_id = ReviewId::generate(); + // TODO(ap): create stub movie/user entries in DB so feed JOIN queries work. + // For now, use deterministic UUIDs from content hash; reviews will be orphaned in JOINs. + let movie_id_uuid = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, json.movie_title.as_bytes()); + let movie_id = domain::value_objects::MovieId::from_uuid(movie_id_uuid); + let user_id_uuid = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, actor_url.as_bytes()); + let user_id = domain::value_objects::UserId::from_uuid(user_id_uuid); + let rating = domain::value_objects::Rating::new(json.rating.min(5)) + .map_err(|e| Error(anyhow::anyhow!("{}", e)))?; + let comment = json + .comment + .map(|c| domain::value_objects::Comment::new(c)) + .transpose() + .map_err(|e| Error(anyhow::anyhow!("{}", e)))?; + let watched_at = json.watched_at.naive_utc(); + let created_at = json.published.naive_utc(); + + let review = Review::from_persistence( + review_id, + movie_id, + user_id, + rating, + comment, + watched_at, + created_at, + ReviewSource::Remote { actor_url }, + ); + + let ap_id = review_url(&data.base_url, review.id()); + data.federation_repo.save_remote_review(&review).await?; + + Ok(DbReview { review, ap_id }) + } +} diff --git a/crates/adapters/activitypub/src/outbox.rs b/crates/adapters/activitypub/src/outbox.rs new file mode 100644 index 0000000..703c9e9 --- /dev/null +++ b/crates/adapters/activitypub/src/outbox.rs @@ -0,0 +1,46 @@ +use activitypub_federation::{axum::json::FederationJson, config::Data}; +use axum::extract::Path; +use serde::{Deserialize, Serialize}; + +use domain::value_objects::UserId; + +use crate::data::FederationData; +use crate::error::Error; + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OrderedCollection { + #[serde(rename = "@context")] + context: String, + #[serde(rename = "type")] + kind: String, + id: String, + total_items: u64, + ordered_items: Vec, +} + +pub async fn outbox_handler( + Path(user_id_str): Path, + data: Data, +) -> Result, Error> { + let uuid = uuid::Uuid::parse_str(&user_id_str) + .map_err(|_| Error(anyhow::anyhow!("invalid user id")))?; + let user_id = UserId::from_uuid(uuid); + + // verify user exists + data.user_repo + .find_by_id(&user_id) + .await + .map_err(|e| Error(e.into()))? + .ok_or_else(|| Error(anyhow::anyhow!("user not found")))?; + + let outbox_url = format!("{}/users/{}/outbox", data.base_url, user_id_str); + + Ok(FederationJson(OrderedCollection { + context: "https://www.w3.org/ns/activitystreams".to_string(), + kind: "OrderedCollection".to_string(), + id: outbox_url, + total_items: 0, + ordered_items: vec![], + })) +} diff --git a/crates/adapters/activitypub/src/repository.rs b/crates/adapters/activitypub/src/repository.rs new file mode 100644 index 0000000..e686312 --- /dev/null +++ b/crates/adapters/activitypub/src/repository.rs @@ -0,0 +1,43 @@ +use anyhow::Result; +use async_trait::async_trait; +use domain::models::Review; +use domain::value_objects::UserId; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FollowerStatus { + Pending, + Accepted, + Rejected, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RemoteActor { + pub url: String, + pub handle: String, + pub inbox_url: String, + pub shared_inbox_url: Option, + pub display_name: Option, +} + +#[derive(Debug, Clone)] +pub struct Follower { + pub actor: RemoteActor, + pub status: FollowerStatus, +} + +#[async_trait] +pub trait FederationRepository: Send + Sync { + async fn add_follower(&self, local_user_id: UserId, remote_actor_url: &str, status: FollowerStatus) -> Result<()>; + async fn remove_follower(&self, local_user_id: UserId, remote_actor_url: &str) -> Result<()>; + async fn get_followers(&self, local_user_id: UserId) -> Result>; + async fn update_follower_status(&self, local_user_id: UserId, remote_actor_url: &str, status: FollowerStatus) -> Result<()>; + async fn add_following(&self, local_user_id: UserId, actor: RemoteActor) -> Result<()>; + async fn remove_following(&self, local_user_id: UserId, actor_url: &str) -> Result<()>; + async fn get_following(&self, local_user_id: UserId) -> Result>; + async fn count_following(&self, local_user_id: UserId) -> Result; + async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()>; + async fn get_remote_actor(&self, actor_url: &str) -> Result>; + async fn save_remote_review(&self, review: &Review) -> Result<()>; + async fn get_local_actor_keypair(&self, user_id: UserId) -> Result>; + async fn save_local_actor_keypair(&self, user_id: UserId, public_key: String, private_key: String) -> Result<()>; +} diff --git a/crates/adapters/activitypub/src/service.rs b/crates/adapters/activitypub/src/service.rs new file mode 100644 index 0000000..d95af79 --- /dev/null +++ b/crates/adapters/activitypub/src/service.rs @@ -0,0 +1,188 @@ +use std::sync::Arc; + +use activitypub_federation::{ + activity_sending::SendActivityTask, + config::Data, + fetch::{object_id::ObjectId, webfinger::webfinger_resolve_actor}, + protocol::context::WithContext, + traits::Actor, +}; +use axum::{routing::get, routing::post, Router}; +use domain::{ports::UserRepository, value_objects::UserId}; +use url::Url; + +use crate::{ + activities::{FollowActivity, UndoActivity}, + actor_handler::actor_handler, + actors::{get_local_actor, DbActor}, + data::FederationData, + event_handler::ActivityPubEventHandler, + federation::ApFederationConfig, + followers_handler::{followers_handler, following_handler}, + inbox::inbox_handler, + outbox::outbox_handler, + repository::{FederationRepository, RemoteActor}, + webfinger::webfinger_handler, +}; + +pub struct ActivityPubService { + federation_config: ApFederationConfig, + base_url: String, +} + +impl ActivityPubService { + pub async fn new( + repo: Arc, + user_repo: Arc, + base_url: String, + debug: bool, + ) -> anyhow::Result { + let data = FederationData::new(repo, user_repo, base_url.clone()); + let federation_config = ApFederationConfig::new(data, debug).await?; + Ok(Self { + federation_config, + base_url, + }) + } + + pub fn federation_config(&self) -> &ApFederationConfig { + &self.federation_config + } + + pub fn request_data(&self) -> Data { + self.federation_config.to_request_data() + } + + pub fn router(&self) -> Router { + Router::new() + .route("/.well-known/webfinger", get(webfinger_handler)) + .route("/users/{user_id}", get(actor_handler)) + .route("/users/{user_id}/inbox", post(inbox_handler)) + .route("/users/{user_id}/outbox", get(outbox_handler)) + .route("/users/{user_id}/followers", get(followers_handler)) + .route("/users/{user_id}/following", get(following_handler)) + .layer(self.federation_config.middleware()) + } + + pub fn event_handler(&self) -> ActivityPubEventHandler { + ActivityPubEventHandler::new(self.federation_config.clone(), self.base_url.clone()) + } + + pub async fn follow(&self, local_user_id: UserId, handle: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + + let remote_actor: DbActor = webfinger_resolve_actor(handle, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let local_actor = get_local_actor(local_user_id.clone(), &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let follow_id = Url::parse(&format!( + "{}/activities/{}", + self.base_url, + uuid::Uuid::new_v4() + ))?; + let follow = FollowActivity { + id: follow_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: ObjectId::from(remote_actor.ap_id.clone()), + }; + let follow_with_ctx = WithContext::new_default(follow); + + let sends = SendActivityTask::prepare( + &follow_with_ctx, + &local_actor, + vec![remote_actor.inbox()], + &data, + ) + .await?; + for send in sends { + send.sign_and_send(&data).await?; + } + + let remote = RemoteActor { + url: remote_actor.ap_id.to_string(), + handle: remote_actor + .email + .split('@') + .next() + .unwrap_or(&remote_actor.email) + .to_string(), + inbox_url: remote_actor.inbox_url.to_string(), + shared_inbox_url: None, + display_name: Some(remote_actor.email.clone()), + }; + data.federation_repo + .add_following(local_user_id, remote) + .await?; + + Ok(()) + } + + pub async fn unfollow(&self, local_user_id: UserId, actor_url_str: &str) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + + let remote = data + .federation_repo + .get_remote_actor(actor_url_str) + .await? + .ok_or_else(|| anyhow::anyhow!("remote actor not found: {}", actor_url_str))?; + + let local_actor = get_local_actor(local_user_id.clone(), &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let remote_ap_id = Url::parse(actor_url_str)?; + let inbox = Url::parse(&remote.inbox_url)?; + + let follow_id = Url::parse(&format!( + "{}/activities/{}", + self.base_url, + uuid::Uuid::new_v4() + ))?; + let follow = FollowActivity { + id: follow_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: ObjectId::from(remote_ap_id), + }; + + let undo_id = Url::parse(&format!( + "{}/activities/{}", + self.base_url, + uuid::Uuid::new_v4() + ))?; + let undo = UndoActivity { + id: undo_id, + kind: Default::default(), + actor: ObjectId::from(local_actor.ap_id.clone()), + object: follow, + }; + let undo_with_ctx = WithContext::new_default(undo); + + let sends = + SendActivityTask::prepare(&undo_with_ctx, &local_actor, vec![inbox], &data).await?; + for send in sends { + send.sign_and_send(&data).await?; + } + + data.federation_repo + .remove_following(local_user_id, actor_url_str) + .await?; + + Ok(()) + } + + pub async fn get_following(&self, local_user_id: UserId) -> anyhow::Result> { + let data = self.federation_config.to_request_data(); + data.federation_repo.get_following(local_user_id).await + } + + pub async fn count_following(&self, local_user_id: UserId) -> anyhow::Result { + let data = self.federation_config.to_request_data(); + data.federation_repo.count_following(local_user_id).await + } +} diff --git a/crates/adapters/activitypub/src/webfinger.rs b/crates/adapters/activitypub/src/webfinger.rs new file mode 100644 index 0000000..1ce056b --- /dev/null +++ b/crates/adapters/activitypub/src/webfinger.rs @@ -0,0 +1,48 @@ +use activitypub_federation::{ + config::Data, + fetch::webfinger::{build_webfinger_response, extract_webfinger_name, Webfinger}, +}; +use axum::{ + extract::Query, + http::header, + response::{IntoResponse, Response}, +}; +use serde::Deserialize; + +use crate::actors::actor_url; +use crate::data::FederationData; +use crate::error::Error; + +#[derive(Deserialize)] +pub struct WebfingerQuery { + resource: String, +} + +pub async fn webfinger_handler( + Query(query): Query, + data: Data, +) -> Result { + let name = extract_webfinger_name(&query.resource, &data)?; + + // Look up user by email username@domain + let email_str = format!("{}@{}", name, data.domain); + let email = domain::value_objects::Email::new(email_str) + .map_err(|e| Error(anyhow::anyhow!("{}", e)))?; + + let user = data + .user_repo + .find_by_email(&email) + .await + .map_err(|e| Error(e.into()))? + .ok_or_else(|| Error(anyhow::anyhow!("user not found")))?; + + let ap_id = actor_url(&data.base_url, user.id()); + + let wf: Webfinger = build_webfinger_response(query.resource, ap_id); + let body = serde_json::to_string(&wf) + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + Ok(( + [(header::CONTENT_TYPE, "application/jrd+json")], + body, + ).into_response()) +} diff --git a/crates/adapters/sqlite/Cargo.toml b/crates/adapters/sqlite/Cargo.toml index 1b9e049..f226e45 100644 --- a/crates/adapters/sqlite/Cargo.toml +++ b/crates/adapters/sqlite/Cargo.toml @@ -12,6 +12,8 @@ sqlx = { version = "0.8.6", features = [ ] } domain = { workspace = true } +activitypub = { workspace = true } +anyhow = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } tracing = { workspace = true } diff --git a/crates/adapters/sqlite/migrations/0003_activitypub.sql b/crates/adapters/sqlite/migrations/0003_activitypub.sql new file mode 100644 index 0000000..f23b41e --- /dev/null +++ b/crates/adapters/sqlite/migrations/0003_activitypub.sql @@ -0,0 +1,28 @@ +ALTER TABLE reviews ADD COLUMN remote_actor_url TEXT; +CREATE TABLE ap_followers ( + local_user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + remote_actor_url TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + created_at TEXT NOT NULL, + PRIMARY KEY (local_user_id, remote_actor_url) +); +CREATE TABLE ap_following ( + local_user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + remote_actor_url TEXT NOT NULL, + created_at TEXT NOT NULL, + PRIMARY KEY (local_user_id, remote_actor_url) +); +CREATE TABLE ap_remote_actors ( + url TEXT PRIMARY KEY, + handle TEXT NOT NULL, + inbox_url TEXT NOT NULL, + shared_inbox_url TEXT, + display_name TEXT, + fetched_at TEXT NOT NULL +); +CREATE TABLE IF NOT EXISTS ap_local_actors ( + user_id TEXT PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE, + public_key TEXT NOT NULL, + private_key TEXT NOT NULL, + created_at TEXT NOT NULL +); diff --git a/crates/adapters/sqlite/src/federation.rs b/crates/adapters/sqlite/src/federation.rs new file mode 100644 index 0000000..3db32e0 --- /dev/null +++ b/crates/adapters/sqlite/src/federation.rs @@ -0,0 +1,332 @@ +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use chrono::Utc; +use sqlx::{Row, SqlitePool}; + +use activitypub::repository::{FederationRepository, Follower, FollowerStatus, RemoteActor}; +use domain::models::{Review, ReviewSource}; +use domain::value_objects::UserId; + +use crate::models::datetime_to_str; + +pub struct SqliteFederationRepository { + pool: SqlitePool, +} + +impl SqliteFederationRepository { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +fn status_to_str(status: &FollowerStatus) -> &'static str { + match status { + FollowerStatus::Pending => "pending", + FollowerStatus::Accepted => "accepted", + FollowerStatus::Rejected => "rejected", + } +} + +fn str_to_status(s: &str) -> FollowerStatus { + match s { + "accepted" => FollowerStatus::Accepted, + "rejected" => FollowerStatus::Rejected, + _ => FollowerStatus::Pending, + } +} + +#[async_trait] +impl FederationRepository for SqliteFederationRepository { + async fn add_follower( + &self, + local_user_id: UserId, + remote_actor_url: &str, + status: FollowerStatus, + ) -> Result<()> { + let uid = local_user_id.value().to_string(); + let status_str = status_to_str(&status); + let now = Utc::now().naive_utc(); + let created_at = datetime_to_str(&now); + + sqlx::query( + "INSERT INTO ap_followers (local_user_id, remote_actor_url, status, created_at) + VALUES (?1, ?2, ?3, ?4) + ON CONFLICT(local_user_id, remote_actor_url) DO UPDATE SET status = excluded.status", + ) + .bind(&uid) + .bind(remote_actor_url) + .bind(status_str) + .bind(&created_at) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn remove_follower(&self, local_user_id: UserId, remote_actor_url: &str) -> Result<()> { + let uid = local_user_id.value().to_string(); + + sqlx::query( + "DELETE FROM ap_followers WHERE local_user_id = ? AND remote_actor_url = ?", + ) + .bind(&uid) + .bind(remote_actor_url) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn get_followers(&self, local_user_id: UserId) -> Result> { + let uid = local_user_id.value().to_string(); + + let rows = sqlx::query( + "SELECT f.remote_actor_url, f.status, + a.handle, a.inbox_url, a.shared_inbox_url, a.display_name + FROM ap_followers f + LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = ?", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await?; + + let followers = rows + .into_iter() + .map(|row| { + let url: String = row.get("remote_actor_url"); + let status_str: String = row.get("status"); + let handle: String = row.try_get("handle").unwrap_or_default(); + let inbox_url: String = row.try_get("inbox_url").unwrap_or_default(); + let shared_inbox_url: Option = row.try_get("shared_inbox_url").ok().flatten(); + let display_name: Option = row.try_get("display_name").ok().flatten(); + + Follower { + actor: RemoteActor { + url, + handle, + inbox_url, + shared_inbox_url, + display_name, + }, + status: str_to_status(&status_str), + } + }) + .collect(); + + Ok(followers) + } + + async fn update_follower_status( + &self, + local_user_id: UserId, + remote_actor_url: &str, + status: FollowerStatus, + ) -> Result<()> { + let uid = local_user_id.value().to_string(); + let status_str = status_to_str(&status); + + let result = sqlx::query( + "UPDATE ap_followers SET status = ? WHERE local_user_id = ? AND remote_actor_url = ?", + ) + .bind(status_str) + .bind(&uid) + .bind(remote_actor_url) + .execute(&self.pool) + .await?; + + if result.rows_affected() == 0 { + tracing::warn!( + local_user_id = %local_user_id.value(), + remote_actor_url = remote_actor_url, + "update_follower_status: no row found" + ); + } + + Ok(()) + } + + async fn add_following(&self, local_user_id: UserId, actor: RemoteActor) -> Result<()> { + let uid = local_user_id.value().to_string(); + let now = Utc::now().naive_utc(); + let created_at = datetime_to_str(&now); + + self.upsert_remote_actor(actor.clone()).await?; + + sqlx::query( + "INSERT OR IGNORE INTO ap_following (local_user_id, remote_actor_url, created_at) + VALUES (?, ?, ?)", + ) + .bind(&uid) + .bind(&actor.url) + .bind(&created_at) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn remove_following(&self, local_user_id: UserId, actor_url: &str) -> Result<()> { + let uid = local_user_id.value().to_string(); + + sqlx::query( + "DELETE FROM ap_following WHERE local_user_id = ? AND remote_actor_url = ?", + ) + .bind(&uid) + .bind(actor_url) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn get_following(&self, local_user_id: UserId) -> Result> { + let uid = local_user_id.value().to_string(); + + let rows = sqlx::query( + "SELECT a.url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name + FROM ap_following f + INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = ?", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await?; + + let actors = rows + .into_iter() + .map(|row| RemoteActor { + url: row.get("url"), + handle: row.get("handle"), + inbox_url: row.get("inbox_url"), + shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), + display_name: row.try_get("display_name").ok().flatten(), + }) + .collect(); + + Ok(actors) + } + + async fn count_following(&self, local_user_id: UserId) -> Result { + let uid = local_user_id.value().to_string(); + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM ap_following WHERE local_user_id = ?", + ) + .bind(&uid) + .fetch_one(&self.pool) + .await?; + Ok(count as usize) + } + + async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> { + let now = Utc::now().naive_utc(); + let fetched_at = datetime_to_str(&now); + + sqlx::query( + "INSERT INTO ap_remote_actors (url, handle, inbox_url, shared_inbox_url, display_name, fetched_at) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(url) DO UPDATE SET + handle = excluded.handle, + inbox_url = excluded.inbox_url, + shared_inbox_url = excluded.shared_inbox_url, + display_name = excluded.display_name, + fetched_at = excluded.fetched_at", + ) + .bind(&actor.url) + .bind(&actor.handle) + .bind(&actor.inbox_url) + .bind(&actor.shared_inbox_url) + .bind(&actor.display_name) + .bind(&fetched_at) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn get_remote_actor(&self, actor_url: &str) -> Result> { + let row = sqlx::query( + "SELECT url, handle, inbox_url, shared_inbox_url, display_name + FROM ap_remote_actors WHERE url = ?", + ) + .bind(actor_url) + .fetch_optional(&self.pool) + .await?; + + Ok(row.map(|row| RemoteActor { + url: row.get("url"), + handle: row.get("handle"), + inbox_url: row.get("inbox_url"), + shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), + display_name: row.try_get("display_name").ok().flatten(), + })) + } + + async fn get_local_actor_keypair(&self, user_id: UserId) -> Result> { + let uid = user_id.value().to_string(); + let row = sqlx::query( + "SELECT public_key, private_key FROM ap_local_actors WHERE user_id = ?", + ) + .bind(&uid) + .fetch_optional(&self.pool) + .await?; + + Ok(row.map(|r| (r.get("public_key"), r.get("private_key")))) + } + + async fn save_local_actor_keypair(&self, user_id: UserId, public_key: String, private_key: String) -> Result<()> { + let uid = user_id.value().to_string(); + let now = Utc::now().naive_utc(); + let created_at = datetime_to_str(&now); + + sqlx::query( + "INSERT INTO ap_local_actors (user_id, public_key, private_key, created_at) + VALUES (?, ?, ?, ?) + ON CONFLICT(user_id) DO UPDATE SET + public_key = excluded.public_key, + private_key = excluded.private_key", + ) + .bind(&uid) + .bind(&public_key) + .bind(&private_key) + .bind(&created_at) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn save_remote_review(&self, review: &Review) -> Result<()> { + let actor_url = match review.source() { + ReviewSource::Remote { actor_url } => actor_url.clone(), + ReviewSource::Local => { + return Err(anyhow!("save_remote_review called with a local review")); + } + }; + + let id = review.id().value().to_string(); + let movie_id = review.movie_id().value().to_string(); + let user_id = review.user_id().value().to_string(); + let rating = review.rating().value() as i64; + let comment = review.comment().map(|c| c.value().to_string()); + let watched_at = datetime_to_str(review.watched_at()); + let created_at = datetime_to_str(review.created_at()); + + sqlx::query( + "INSERT OR IGNORE INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&id) + .bind(&movie_id) + .bind(&user_id) + .bind(rating) + .bind(&comment) + .bind(&watched_at) + .bind(&created_at) + .bind(&actor_url) + .execute(&self.pool) + .await?; + + Ok(()) + } +} diff --git a/crates/adapters/sqlite/src/lib.rs b/crates/adapters/sqlite/src/lib.rs index aee9369..a4d49ae 100644 --- a/crates/adapters/sqlite/src/lib.rs +++ b/crates/adapters/sqlite/src/lib.rs @@ -4,7 +4,7 @@ use domain::{ events::DomainEvent, models::{ DiaryEntry, DiaryFilter, DirectorStat, FeedEntry, Movie, MonthlyRating, - Review, ReviewHistory, SortDirection, UserStats, UserTrends, + Review, ReviewHistory, ReviewSource, SortDirection, UserStats, UserTrends, collections::{PageParams, Paginated}, }, ports::MovieRepository, @@ -12,6 +12,7 @@ use domain::{ }; use sqlx::SqlitePool; +mod federation; mod migrations; mod models; mod users; @@ -21,6 +22,7 @@ use models::{ UserTotalsRow, datetime_to_str, }; +pub use federation::SqliteFederationRepository; pub use users::SqliteUserRepository; fn format_year_month(ym: &str) -> String { @@ -80,7 +82,7 @@ impl SqliteMovieRepository { SortDirection::Descending | SortDirection::ByRatingDesc => sqlx::query_as!( DiaryRow, "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url FROM reviews r INNER JOIN movies m ON m.id = r.movie_id ORDER BY r.watched_at DESC @@ -95,7 +97,7 @@ impl SqliteMovieRepository { SortDirection::Ascending => sqlx::query_as!( DiaryRow, "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url FROM reviews r INNER JOIN movies m ON m.id = r.movie_id ORDER BY r.watched_at ASC @@ -121,7 +123,7 @@ impl SqliteMovieRepository { SortDirection::Descending | SortDirection::ByRatingDesc => sqlx::query_as!( DiaryRow, "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url FROM reviews r INNER JOIN movies m ON m.id = r.movie_id WHERE r.movie_id = ? @@ -138,7 +140,7 @@ impl SqliteMovieRepository { SortDirection::Ascending => sqlx::query_as!( DiaryRow, "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url FROM reviews r INNER JOIN movies m ON m.id = r.movie_id WHERE r.movie_id = ? @@ -173,7 +175,7 @@ impl SqliteMovieRepository { sqlx::query_as!( DiaryRow, "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url FROM reviews r INNER JOIN movies m ON m.id = r.movie_id WHERE r.user_id = ? @@ -195,7 +197,7 @@ impl SqliteMovieRepository { sqlx::query_as!( DiaryRow, "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url FROM reviews r INNER JOIN movies m ON m.id = r.movie_id WHERE r.user_id = ? @@ -223,7 +225,7 @@ impl SqliteMovieRepository { sqlx::query_as!( FeedRow, "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url, u.email AS user_email FROM reviews r INNER JOIN movies m ON m.id = r.movie_id @@ -386,17 +388,22 @@ impl MovieRepository for SqliteMovieRepository { let comment = review.comment().map(|c| c.value().to_string()); let watched_at = datetime_to_str(review.watched_at()); let created_at = datetime_to_str(review.created_at()); + let remote_actor_url = match review.source() { + ReviewSource::Local => None, + ReviewSource::Remote { actor_url } => Some(actor_url.clone()), + }; sqlx::query!( - "INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?)", + "INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)", id, movie_id, user_id, rating, comment, watched_at, - created_at + created_at, + remote_actor_url ) .execute(&self.pool) .await @@ -464,7 +471,7 @@ impl MovieRepository for SqliteMovieRepository { let id = review_id.value().to_string(); sqlx::query_as!( ReviewRow, - "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at + "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url FROM reviews WHERE id = ?", id ) @@ -510,7 +517,7 @@ impl MovieRepository for SqliteMovieRepository { let viewings = sqlx::query_as!( ReviewRow, - "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at + "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url FROM reviews WHERE movie_id = ? ORDER BY watched_at ASC", id_str ) @@ -573,7 +580,7 @@ impl MovieRepository for SqliteMovieRepository { let rows = sqlx::query_as!( DiaryRow, "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url FROM reviews r INNER JOIN movies m ON m.id = r.movie_id WHERE r.user_id = ? diff --git a/crates/adapters/sqlite/src/models.rs b/crates/adapters/sqlite/src/models.rs index 3f4abf7..269177a 100644 --- a/crates/adapters/sqlite/src/models.rs +++ b/crates/adapters/sqlite/src/models.rs @@ -1,7 +1,7 @@ use chrono::NaiveDateTime; use domain::{ errors::DomainError, - models::{DiaryEntry, FeedEntry, Movie, Review, UserSummary}, + models::{DiaryEntry, FeedEntry, Movie, Review, ReviewSource, UserSummary}, value_objects::{ Comment, ExternalMetadataId, MovieId, MovieTitle, PosterPath, Rating, ReleaseYear, ReviewId, UserId, @@ -49,6 +49,7 @@ pub(crate) struct ReviewRow { pub comment: Option, pub watched_at: String, pub created_at: String, + pub remote_actor_url: Option, } impl ReviewRow { @@ -60,8 +61,12 @@ impl ReviewRow { let comment = self.comment.map(Comment::new).transpose()?; let watched_at = parse_datetime(&self.watched_at)?; let created_at = parse_datetime(&self.created_at)?; + let source = match self.remote_actor_url { + None => ReviewSource::Local, + Some(url) => ReviewSource::Remote { actor_url: url }, + }; Ok(Review::from_persistence( - id, movie_id, user_id, rating, comment, watched_at, created_at, + id, movie_id, user_id, rating, comment, watched_at, created_at, source, )) } } @@ -82,6 +87,7 @@ pub(crate) struct DiaryRow { pub comment: Option, pub watched_at: String, pub created_at: String, + pub remote_actor_url: Option, } impl DiaryRow { @@ -104,6 +110,7 @@ impl DiaryRow { comment: self.comment, watched_at: self.watched_at, created_at: self.created_at, + remote_actor_url: self.remote_actor_url, } .to_domain()?; @@ -127,6 +134,7 @@ pub(crate) struct FeedRow { pub comment: Option, pub watched_at: String, pub created_at: String, + pub remote_actor_url: Option, pub user_email: String, } @@ -146,6 +154,7 @@ impl FeedRow { comment: self.comment, watched_at: self.watched_at, created_at: self.created_at, + remote_actor_url: self.remote_actor_url, } .to_domain()?; Ok(FeedEntry::new(diary, self.user_email)) diff --git a/crates/adapters/template-askama/src/lib.rs b/crates/adapters/template-askama/src/lib.rs index 7239d00..e099255 100644 --- a/crates/adapters/template-askama/src/lib.rs +++ b/crates/adapters/template-askama/src/lib.rs @@ -1,12 +1,12 @@ use askama::Template; use chrono::Datelike; use application::ports::{ - ActivityFeedPageData, HtmlPageContext, HtmlRenderer, LoginPageData, + ActivityFeedPageData, FollowingPageData, HtmlPageContext, HtmlRenderer, LoginPageData, NewReviewPageData, ProfilePageData, RegisterPageData, UsersPageData, }; use domain::models::{ - DiaryEntry, FeedEntry, MonthActivity, MonthlyRating, UserStats, UserSummary, UserTrends, - collections::Paginated, + DiaryEntry, FeedEntry, MonthActivity, MonthlyRating, ReviewSource, UserStats, UserSummary, + UserTrends, collections::Paginated, }; struct PageItem { @@ -110,6 +110,24 @@ struct ProfileTemplate<'a> { monthly_rating_rows: Vec>, heatmap: Vec, page_items: Vec, + is_own_profile: bool, + error: Option, + following_count: usize, +} + +struct RemoteActorData { + handle: String, + display_name: Option, + url: String, +} + +#[derive(Template)] +#[template(path = "following.html")] +struct FollowingTemplate { + ctx: HtmlPageContext, + user_id: uuid::Uuid, + actors: Vec, + error: Option, } struct HeatmapCell { @@ -276,6 +294,24 @@ impl HtmlRenderer for AskamaHtmlRenderer { monthly_rating_rows, heatmap, page_items: build_page_items(total_pages, current_page), + is_own_profile: data.is_own_profile, + error: data.error, + following_count: data.following_count, + } + .render() + .map_err(|e| e.to_string()) + } + + fn render_following_page(&self, data: FollowingPageData) -> Result { + FollowingTemplate { + ctx: data.ctx, + user_id: data.user_id, + actors: data.actors.into_iter().map(|a| RemoteActorData { + handle: a.handle, + display_name: a.display_name, + url: a.url, + }).collect(), + error: data.error, } .render() .map_err(|e| e.to_string()) diff --git a/crates/adapters/template-askama/templates/activity_feed.html b/crates/adapters/template-askama/templates/activity_feed.html index 6eaf84a..ed5df2d 100644 --- a/crates/adapters/template-askama/templates/activity_feed.html +++ b/crates/adapters/template-askama/templates/activity_feed.html @@ -27,6 +27,11 @@
{{ entry.user_display_name() }} {{ entry.review().watched_at().format("%b %-d, %Y") }} + {% match entry.review().source() %} + {% when ReviewSource::Remote with { actor_url } %} + ↗ federated + {% when ReviewSource::Local %} + {% endmatch %}
{% if ctx.is_current_user(entry.review().user_id().value()) %}
diff --git a/crates/adapters/template-askama/templates/following.html b/crates/adapters/template-askama/templates/following.html new file mode 100644 index 0000000..689afab --- /dev/null +++ b/crates/adapters/template-askama/templates/following.html @@ -0,0 +1,26 @@ +{% extends "base.html" %} +{% block content %} +

Following

+{% if let Some(err) = error %} +

{{ err }}

+{% endif %} +{% if actors.is_empty() %} +

Not following anyone yet. Follow remote users from your profile page.

+{% else %} +
    +{% for actor in actors %} +
  • + {{ actor.handle }} + {% if let Some(name) = actor.display_name %} + ({{ name }}) + {% endif %} + {{ actor.url }} + + + +
  • + +{% endfor %} +
+{% endif %} +{% endblock %} diff --git a/crates/adapters/template-askama/templates/profile.html b/crates/adapters/template-askama/templates/profile.html index 84f1521..937f7fe 100644 --- a/crates/adapters/template-askama/templates/profile.html +++ b/crates/adapters/template-askama/templates/profile.html @@ -24,6 +24,20 @@ + {% if is_own_profile %} + + View following ({{ following_count }}) + {% endif %} +
Recent Top Rated diff --git a/crates/application/src/ports.rs b/crates/application/src/ports.rs index 5521cad..eee8906 100644 --- a/crates/application/src/ports.rs +++ b/crates/application/src/ports.rs @@ -2,6 +2,12 @@ use uuid::Uuid; use domain::models::{DiaryEntry, FeedEntry, MonthActivity, UserStats, UserSummary, UserTrends, collections::Paginated}; +pub struct RemoteActorView { + pub handle: String, + pub display_name: Option, + pub url: String, +} + pub struct HtmlPageContext { pub user_email: Option, pub user_id: Option, @@ -57,6 +63,16 @@ pub struct ProfilePageData { pub limit: u32, pub history: Option>, pub trends: Option, + pub is_own_profile: bool, + pub error: Option, + pub following_count: usize, +} + +pub struct FollowingPageData { + pub ctx: HtmlPageContext, + pub user_id: Uuid, + pub actors: Vec, + pub error: Option, } pub trait HtmlRenderer: Send + Sync { @@ -67,6 +83,7 @@ pub trait HtmlRenderer: Send + Sync { fn render_activity_feed_page(&self, data: ActivityFeedPageData) -> Result; fn render_users_page(&self, data: UsersPageData) -> Result; fn render_profile_page(&self, data: ProfilePageData) -> Result; + fn render_following_page(&self, data: FollowingPageData) -> Result; } pub trait RssFeedRenderer: Send + Sync { diff --git a/crates/domain/src/models/mod.rs b/crates/domain/src/models/mod.rs index 0f62ddd..5e4351c 100644 --- a/crates/domain/src/models/mod.rs +++ b/crates/domain/src/models/mod.rs @@ -114,6 +114,18 @@ impl Movie { } } +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ReviewSource { + Local, + Remote { actor_url: String }, +} + +impl Default for ReviewSource { + fn default() -> Self { + ReviewSource::Local + } +} + #[derive(Clone, Debug)] pub struct Review { id: ReviewId, @@ -123,6 +135,7 @@ pub struct Review { comment: Option, watched_at: chrono::NaiveDateTime, created_at: chrono::NaiveDateTime, + source: ReviewSource, } impl Review { @@ -141,6 +154,7 @@ impl Review { comment, watched_at, created_at: Utc::now().naive_utc(), + source: ReviewSource::Local, }) } @@ -152,6 +166,7 @@ impl Review { comment: Option, watched_at: NaiveDateTime, created_at: NaiveDateTime, + source: ReviewSource, ) -> Self { Self { id, @@ -161,6 +176,7 @@ impl Review { comment, watched_at, created_at, + source, } } @@ -185,6 +201,9 @@ impl Review { pub fn created_at(&self) -> &NaiveDateTime { &self.created_at } + pub fn source(&self) -> &ReviewSource { + &self.source + } /// Returns [star1_filled, star2_filled, ..., star5_filled] pub fn stars(&self) -> [bool; 5] { let r = self.rating.value(); diff --git a/crates/presentation/Cargo.toml b/crates/presentation/Cargo.toml index 856798d..5dca7de 100644 --- a/crates/presentation/Cargo.toml +++ b/crates/presentation/Cargo.toml @@ -26,11 +26,13 @@ metadata = { workspace = true } poster-fetcher = { workspace = true } poster-storage = { workspace = true } sqlite = { workspace = true } +activitypub = { workspace = true } sqlx = { workspace = true } template-askama = { workspace = true } event-publisher = { workspace = true } rss = { workspace = true } infer = "0.19.0" +percent-encoding = "2" [dev-dependencies] tower = { version = "0.5", features = ["util"] } diff --git a/crates/presentation/src/dtos.rs b/crates/presentation/src/dtos.rs index e4dc47c..b88dcfa 100644 --- a/crates/presentation/src/dtos.rs +++ b/crates/presentation/src/dtos.rs @@ -234,11 +234,22 @@ impl From for GetDiaryQuery { } } +#[derive(Deserialize)] +pub struct FollowForm { + pub handle: String, +} + +#[derive(Deserialize)] +pub struct UnfollowForm { + pub actor_url: String, +} + #[derive(serde::Deserialize, Default)] pub struct ProfileQueryParams { pub view: Option, pub limit: Option, pub offset: Option, + pub error: Option, } #[cfg(test)] diff --git a/crates/presentation/src/extractors.rs b/crates/presentation/src/extractors.rs index 434bc33..e275cb9 100644 --- a/crates/presentation/src/extractors.rs +++ b/crates/presentation/src/extractors.rs @@ -149,6 +149,7 @@ mod tests { fn render_activity_feed_page(&self, _: application::ports::ActivityFeedPageData) -> Result { panic!() } fn render_users_page(&self, _: application::ports::UsersPageData) -> Result { panic!() } fn render_profile_page(&self, _: application::ports::ProfilePageData) -> Result { panic!() } + fn render_following_page(&self, _: application::ports::FollowingPageData) -> Result { panic!() } } struct PanicRssRenderer; @@ -179,6 +180,7 @@ mod tests { }, html_renderer: Arc::new(PanicRenderer), rss_renderer: Arc::new(PanicRssRenderer), + ap_service: test_ap_service().await, }; let app = test_router(state); @@ -228,7 +230,33 @@ mod tests { } } - fn panic_state() -> crate::state::AppState { + async fn test_ap_service() -> std::sync::Arc { + use std::sync::Arc; + let pool = sqlx::SqlitePool::connect("sqlite::memory:").await.unwrap(); + sqlx::query("CREATE TABLE IF NOT EXISTS ap_keypairs (user_id TEXT PRIMARY KEY, public_key TEXT NOT NULL, private_key TEXT NOT NULL)") + .execute(&pool).await.unwrap(); + sqlx::query("CREATE TABLE IF NOT EXISTS ap_remote_actors (url TEXT PRIMARY KEY, handle TEXT NOT NULL, inbox_url TEXT NOT NULL, shared_inbox_url TEXT, display_name TEXT)") + .execute(&pool).await.unwrap(); + sqlx::query("CREATE TABLE IF NOT EXISTS ap_followers (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', PRIMARY KEY (local_user_id, remote_actor_url))") + .execute(&pool).await.unwrap(); + sqlx::query("CREATE TABLE IF NOT EXISTS ap_following (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, PRIMARY KEY (local_user_id, remote_actor_url))") + .execute(&pool).await.unwrap(); + let fed_repo = Arc::new(sqlite::SqliteFederationRepository::new(pool)); + struct DummyUserRepo; + #[async_trait::async_trait] impl domain::ports::UserRepository for DummyUserRepo { + async fn find_by_email(&self, _: &domain::value_objects::Email) -> Result, domain::errors::DomainError> { Ok(None) } + async fn save(&self, _: &domain::models::User) -> Result<(), domain::errors::DomainError> { Ok(()) } + async fn find_by_id(&self, _: &domain::value_objects::UserId) -> Result, domain::errors::DomainError> { Ok(None) } + async fn list_with_stats(&self) -> Result, domain::errors::DomainError> { Ok(vec![]) } + } + Arc::new( + activitypub::ActivityPubService::new(fed_repo, Arc::new(DummyUserRepo), "http://localhost:3000".to_string(), true) + .await + .unwrap(), + ) + } + + async fn panic_state() -> crate::state::AppState { use std::sync::Arc; use application::context::AppContext; struct PanicRepo2; @@ -266,6 +294,7 @@ mod tests { fn render_activity_feed_page(&self, _: application::ports::ActivityFeedPageData) -> Result { panic!() } fn render_users_page(&self, _: application::ports::UsersPageData) -> Result { panic!() } fn render_profile_page(&self, _: application::ports::ProfilePageData) -> Result { panic!() } + fn render_following_page(&self, _: application::ports::FollowingPageData) -> Result { panic!() } } struct PanicRssRenderer2; impl crate::ports::RssFeedRenderer for PanicRssRenderer2 { @@ -286,10 +315,11 @@ mod tests { }, html_renderer: Arc::new(PanicRenderer2), rss_renderer: Arc::new(PanicRssRenderer2), + ap_service: test_ap_service().await, } } - fn rejecting_state() -> crate::state::AppState { + async fn rejecting_state() -> crate::state::AppState { use std::sync::Arc; use application::context::AppContext; struct PanicRepo3; @@ -326,6 +356,7 @@ mod tests { fn render_activity_feed_page(&self, _: application::ports::ActivityFeedPageData) -> Result { panic!() } fn render_users_page(&self, _: application::ports::UsersPageData) -> Result { panic!() } fn render_profile_page(&self, _: application::ports::ProfilePageData) -> Result { panic!() } + fn render_following_page(&self, _: application::ports::FollowingPageData) -> Result { panic!() } } struct PanicRssRenderer3; impl crate::ports::RssFeedRenderer for PanicRssRenderer3 { @@ -345,12 +376,13 @@ mod tests { }, html_renderer: Arc::new(PanicRenderer3), rss_renderer: Arc::new(PanicRssRenderer3), + ap_service: test_ap_service().await, } } #[tokio::test] async fn optional_cookie_user_returns_none_without_cookie() { - let app = test_router_optional(panic_state()); + let app = test_router_optional(panic_state().await); let response = app .oneshot(Request::builder().uri("/optional").body(Body::empty()).unwrap()) .await @@ -362,7 +394,7 @@ mod tests { #[tokio::test] async fn optional_cookie_user_returns_none_with_invalid_token() { - let app = test_router_optional(rejecting_state()); + let app = test_router_optional(rejecting_state().await); let response = app .oneshot( Request::builder() @@ -380,7 +412,7 @@ mod tests { #[tokio::test] async fn required_cookie_user_redirects_without_cookie() { - let app = test_router_required(panic_state()); + let app = test_router_required(panic_state().await); let response = app .oneshot(Request::builder().uri("/required").body(Body::empty()).unwrap()) .await @@ -391,7 +423,7 @@ mod tests { #[tokio::test] async fn required_cookie_user_redirects_with_invalid_token() { - let app = test_router_required(rejecting_state()); + let app = test_router_required(rejecting_state().await); let response = app .oneshot( Request::builder() diff --git a/crates/presentation/src/handlers.rs b/crates/presentation/src/handlers.rs index 8a14ab2..b7371b3 100644 --- a/crates/presentation/src/handlers.rs +++ b/crates/presentation/src/handlers.rs @@ -13,13 +13,13 @@ pub mod html { use application::{ commands::{DeleteReviewCommand, LoginCommand, RegisterCommand}, - ports::{HtmlPageContext, LoginPageData, NewReviewPageData, RegisterPageData}, + ports::{FollowingPageData, HtmlPageContext, LoginPageData, NewReviewPageData, RegisterPageData, RemoteActorView}, use_cases::{delete_review, log_review, login as login_uc, register as register_uc}, }; use domain::{errors::DomainError, value_objects::UserId}; use crate::{ - dtos::{DiaryQueryParams, ErrorQuery, LoginForm, LogReviewData, LogReviewForm, RegisterForm}, + dtos::{DiaryQueryParams, ErrorQuery, FollowForm, LoginForm, LogReviewData, LogReviewForm, RegisterForm, UnfollowForm}, extractors::{OptionalCookieUser, RequiredCookieUser}, state::AppState, }; @@ -49,10 +49,8 @@ pub mod html { } fn encode_error(msg: &str) -> String { - msg.replace(' ', "+") - .replace('&', "%26") - .replace('=', "%3D") - .replace('"', "%22") + use percent_encoding::{NON_ALPHANUMERIC, utf8_percent_encode}; + utf8_percent_encode(msg, NON_ALPHANUMERIC).to_string() } fn secure_flag() -> &'static str { @@ -298,7 +296,7 @@ pub mod html { Path(profile_user_uuid): Path, Query(params): Query, ) -> impl IntoResponse { - let mut ctx = build_page_context(&state, user_id).await; + let mut ctx = build_page_context(&state, user_id.clone()).await; let view = params.view.unwrap_or_else(|| "recent".to_string()); let profile_user = match state.app_ctx.user_repository @@ -315,6 +313,21 @@ pub mod html { ctx.page_title = format!("{}'s Diary — Movies Diary", display_name); ctx.canonical_url = format!("{}/users/{}", state.app_ctx.config.base_url, profile_user_uuid); + let is_own_profile = user_id.as_ref() + .map(|u| u.value() == profile_user_uuid) + .unwrap_or(false); + + let following_count = if is_own_profile { + if let Some(ref uid) = user_id { + state.ap_service.count_following(uid.clone()).await + .unwrap_or(0) + } else { + 0 + } + } else { + 0 + }; + let query = application::queries::GetUserProfileQuery { user_id: profile_user_uuid, view: view.clone(), @@ -343,6 +356,9 @@ pub mod html { limit, history: profile.history, trends: profile.trends, + is_own_profile, + error: params.error, + following_count, }; match state.html_renderer.render_profile_page(data) { Ok(html) => Html(html).into_response(), @@ -352,6 +368,80 @@ pub mod html { Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } + + pub async fn follow_remote_user( + RequiredCookieUser(user_id): RequiredCookieUser, + State(state): State, + Path(profile_user_uuid): Path, + Form(form): Form, + ) -> impl IntoResponse { + if user_id.value() != profile_user_uuid { + return StatusCode::FORBIDDEN.into_response(); + } + match state.ap_service.follow(user_id.clone(), &form.handle).await { + Ok(()) => Redirect::to(&format!("/users/{}", profile_user_uuid)).into_response(), + Err(e) => { + tracing::error!("follow error: {:?}", e); + let msg = encode_error(&e.to_string()); + Redirect::to(&format!("/users/{}?error={}", profile_user_uuid, msg)).into_response() + } + } + } + + pub async fn unfollow_remote_user( + RequiredCookieUser(user_id): RequiredCookieUser, + State(state): State, + Path(profile_user_uuid): Path, + Form(form): Form, + ) -> impl IntoResponse { + if user_id.value() != profile_user_uuid { + return StatusCode::FORBIDDEN.into_response(); + } + match state.ap_service.unfollow(user_id.clone(), &form.actor_url).await { + Ok(()) => Redirect::to(&format!("/users/{}/following-list", profile_user_uuid)).into_response(), + Err(e) => { + let msg = encode_error(&e.to_string()); + Redirect::to(&format!("/users/{}/following-list?error={}", profile_user_uuid, msg)).into_response() + } + } + } + + pub async fn get_following_page( + RequiredCookieUser(user_id): RequiredCookieUser, + State(state): State, + Path(profile_user_uuid): Path, + Query(params): Query, + ) -> impl IntoResponse { + if user_id.value() != profile_user_uuid { + return StatusCode::FORBIDDEN.into_response(); + } + let mut ctx = build_page_context(&state, Some(user_id.clone())).await; + ctx.page_title = "Following — Movies Diary".to_string(); + ctx.canonical_url = format!("{}/users/{}/following-list", state.app_ctx.config.base_url, profile_user_uuid); + match state.ap_service.get_following(user_id).await { + Ok(following) => { + let actors = following.into_iter().map(|a| RemoteActorView { + handle: a.handle, + display_name: a.display_name, + url: a.url, + }).collect(); + let data = FollowingPageData { + ctx, + user_id: profile_user_uuid, + actors, + error: params.error, + }; + match state.html_renderer.render_following_page(data) { + Ok(html) => Html(html).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e).into_response(), + } + } + Err(e) => { + tracing::error!("get_following error: {:?}", e); + (StatusCode::INTERNAL_SERVER_ERROR, "Failed to load following list").into_response() + } + } + } } pub mod posters { diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index a5afadc..f20c0f1 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -15,7 +15,8 @@ use auth::{AuthConfig, Argon2PasswordHasher, JwtAuthService}; use metadata::MetadataClientImpl; use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher}; use poster_storage::{PosterStorageAdapter, StorageConfig}; -use sqlite::{SqliteMovieRepository, SqliteUserRepository}; +use activitypub::ActivityPubService; +use sqlite::{SqliteFederationRepository, SqliteMovieRepository, SqliteUserRepository}; use rss::RssAdapter; use template_askama::AskamaHtmlRenderer; @@ -70,7 +71,7 @@ async fn wire_dependencies() -> anyhow::Result { PosterFetcherClient, PosterStorage, UserRepository, }; let repository: Arc = Arc::new(movie_repo); - let user_repository: Arc = Arc::new(SqliteUserRepository::new(pool)); + let user_repository: Arc = Arc::new(SqliteUserRepository::new(pool.clone())); let metadata_client: Arc = Arc::new(MetadataClientImpl::new_omdb(omdb_api_key)); let poster_fetcher: Arc = Arc::new(ReqwestPosterFetcher::new(PosterFetcherConfig::from_env())?); let poster_storage: Arc = Arc::new(PosterStorageAdapter::from_config(storage_config)?); @@ -91,10 +92,23 @@ async fn wire_dependencies() -> anyhow::Result { config: app_config.clone(), }; + // Federation + let federation_repo = Arc::new(SqliteFederationRepository::new(pool)); + let ap_service = Arc::new( + ActivityPubService::new( + federation_repo, + Arc::clone(&user_repository), + app_config.base_url.clone(), + cfg!(debug_assertions), + ) + .await?, + ); + let ap_event_handler = ap_service.event_handler(); + let poster_handler = PosterSyncHandler::new(handler_ctx, 3); let (event_publisher, event_worker) = create_event_channel( EventPublisherConfig::from_env(), - vec![Box::new(poster_handler)], + vec![Box::new(poster_handler), Box::new(ap_event_handler)], ); tokio::spawn(event_worker.run()); @@ -116,6 +130,7 @@ async fn wire_dependencies() -> anyhow::Result { rss_renderer: Arc::new(RssAdapter::new( std::env::var("BASE_URL").unwrap_or_else(|_| "http://localhost:3000".into()), )), + ap_service, }) } diff --git a/crates/presentation/src/routes.rs b/crates/presentation/src/routes.rs index 6550615..61414d8 100644 --- a/crates/presentation/src/routes.rs +++ b/crates/presentation/src/routes.rs @@ -9,6 +9,21 @@ use tower_http::{services::ServeDir, trace::TraceLayer}; use crate::{handlers, state::AppState}; +/// Build an ActivityPub router from the service, excluding routes that +/// conflict with HTML routes (/users/{id} and /users/{id}/following). +/// Those AP endpoints are still served via the federation middleware layer +/// applied to the whole AP router scope; the conflicting paths will need +/// content-negotiation wrappers added in Phase 5. +fn ap_routes(state: &AppState) -> Router { + let config = state.ap_service.federation_config(); + Router::new() + .route("/.well-known/webfinger", routing::get(activitypub::webfinger::webfinger_handler)) + .route("/users/{user_id}/inbox", routing::post(activitypub::inbox::inbox_handler)) + .route("/users/{user_id}/outbox", routing::get(activitypub::outbox::outbox_handler)) + .route("/users/{user_id}/followers", routing::get(activitypub::followers_handler::followers_handler)) + .layer(config.middleware()) +} + /// Simple global rate limiter: tracks request count per 60-second window. /// Not per-IP — suitable for a low-traffic personal app. #[derive(Clone)] @@ -47,12 +62,14 @@ impl RateLimiter { pub fn build_router(state: AppState) -> Router { let rate_limit = state.app_ctx.config.rate_limit; + let ap_router = ap_routes(&state); Router::new() .merge(html_routes(rate_limit)) .merge(api_routes(rate_limit)) .nest_service("/static", ServeDir::new("static")) .layer(TraceLayer::new_for_http()) .with_state(state) + .merge(ap_router) } fn html_routes(rate_limit: u64) -> Router { @@ -87,6 +104,18 @@ fn html_routes(rate_limit: u64) -> Router { "/users/{id}", routing::get(handlers::html::get_user_profile), ) + .route( + "/users/{id}/follow", + routing::post(handlers::html::follow_remote_user), + ) + .route( + "/users/{id}/unfollow", + routing::post(handlers::html::unfollow_remote_user), + ) + .route( + "/users/{id}/following-list", + routing::get(handlers::html::get_following_page), + ) .merge(auth) .route( "/reviews/new", diff --git a/crates/presentation/src/state.rs b/crates/presentation/src/state.rs index 7da77ff..c535870 100644 --- a/crates/presentation/src/state.rs +++ b/crates/presentation/src/state.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use activitypub::ActivityPubService; use application::context::AppContext; use crate::ports::{HtmlRenderer, RssFeedRenderer}; @@ -9,4 +10,5 @@ pub struct AppState { pub app_ctx: AppContext, pub html_renderer: Arc, pub rss_renderer: Arc, + pub ap_service: Arc, } diff --git a/crates/presentation/tests/api_test.rs b/crates/presentation/tests/api_test.rs index 515f37b..acf1d52 100644 --- a/crates/presentation/tests/api_test.rs +++ b/crates/presentation/tests/api_test.rs @@ -88,6 +88,32 @@ impl UserRepository for NobodyUserRepo { async fn list_with_stats(&self) -> Result, DomainError> { panic!() } } +async fn test_ap_service() -> Arc { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + sqlx::query("CREATE TABLE IF NOT EXISTS ap_keypairs (user_id TEXT PRIMARY KEY, public_key TEXT NOT NULL, private_key TEXT NOT NULL)") + .execute(&pool).await.unwrap(); + sqlx::query("CREATE TABLE IF NOT EXISTS ap_remote_actors (url TEXT PRIMARY KEY, handle TEXT NOT NULL, inbox_url TEXT NOT NULL, shared_inbox_url TEXT, display_name TEXT)") + .execute(&pool).await.unwrap(); + sqlx::query("CREATE TABLE IF NOT EXISTS ap_followers (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', PRIMARY KEY (local_user_id, remote_actor_url))") + .execute(&pool).await.unwrap(); + sqlx::query("CREATE TABLE IF NOT EXISTS ap_following (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, PRIMARY KEY (local_user_id, remote_actor_url))") + .execute(&pool).await.unwrap(); + let fed_repo = Arc::new(sqlite::SqliteFederationRepository::new(pool)); + struct DummyUserRepo; + #[async_trait] + impl UserRepository for DummyUserRepo { + async fn find_by_email(&self, _: &Email) -> Result, DomainError> { Ok(None) } + async fn save(&self, _: &User) -> Result<(), DomainError> { Ok(()) } + async fn find_by_id(&self, _: &UserId) -> Result, DomainError> { Ok(None) } + async fn list_with_stats(&self) -> Result, DomainError> { Ok(vec![]) } + } + Arc::new( + activitypub::ActivityPubService::new(fed_repo, Arc::new(DummyUserRepo), "http://localhost:3000".to_string(), true) + .await + .unwrap(), + ) +} + async fn test_app() -> Router { let pool = SqlitePool::connect("sqlite::memory:") .await @@ -109,6 +135,7 @@ async fn test_app() -> Router { }, html_renderer: Arc::new(AskamaHtmlRenderer::new()), rss_renderer: Arc::new(RssAdapter::new("http://localhost:3000".into())), + ap_service: test_ap_service().await, }; routes::build_router(state) diff --git a/static/style.css b/static/style.css index 502718f..7680e75 100644 --- a/static/style.css +++ b/static/style.css @@ -517,3 +517,35 @@ form button[type="submit"]:hover { .director-bar { flex: 1; background: rgba(255,255,255,0.08); border-radius: 4px; height: 10px; overflow: hidden; } .director-bar-fill { height: 100%; background: var(--primary); border-radius: 4px; opacity: 0.8; } .director-count { font-size: 0.8rem; opacity: 0.5; width: 20px; text-align: right; } + +/* ---- ActivityPub federation ---- */ +.remote-badge { + font-size: 0.7rem; + color: rgba(255,255,255,0.5); + background: rgba(255,255,255,0.08); + border-radius: 3px; + padding: 1px 5px; + margin-left: 4px; + vertical-align: middle; +} +.follow-section { + margin-top: 1.5rem; + padding-top: 1rem; + border-top: 1px solid rgba(255,255,255,0.1); +} +.follow-section input[type="text"] { + padding: 0.4rem; + border: 1px solid rgba(255,255,255,0.2); + border-radius: 3px; + min-width: 280px; + background: rgba(255,255,255,0.08); + color: inherit; +} +.following-list { list-style: none; padding: 0; } +.following-item { + padding: 0.5rem 0; + border-bottom: 1px solid rgba(255,255,255,0.07); + display: flex; + align-items: center; + gap: 0.5rem; +}