From 624cfe5799b3d15d88f07923be9bbdee6f81ff74 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 29 May 2026 10:42:53 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20migrate=20k-ap=200.1.10=E2=86=920.3.1,?= =?UTF-8?q?=20fix=20AP=20gaps?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - split FederationRepository into FollowRepository, ActorRepository, BlocklistRepository, ActivityRepository - RemoteActor: 5 new fields (bio, banner_url, followers_url, following_url, also_known_as) - ApObjectHandler split: get_local_objects_page/count_local_posts → ApContentReader trait - builder API: positional args → named setters - broadcast_create_note/update_note: add ApVisibility + mentioned_inboxes params - backfill_outbox → import_remote_outbox - ApUser: also_known_as Option → Vec, new fields AP gaps fixed: - add GET /users/{id}/followers + /following with content negotiation - wire EventPublisher into builder via FederationEventBridge adapter - add display_name field full stack (domain→DB→API→AP) - DB-side outbox pagination (get_local_reviews_page) - set featured_url on ApUser --- Cargo.lock | 22 +- crates/adapters/activitypub/Cargo.toml | 2 +- .../activitypub/src/composite_handler.rs | 28 +- .../adapters/activitypub/src/event_handler.rs | 6 +- .../src/federation_event_bridge.rs | 45 ++ crates/adapters/activitypub/src/lib.rs | 45 +- crates/adapters/activitypub/src/port.rs | 22 +- .../activitypub/src/review_handler.rs | 75 +- .../adapters/activitypub/src/user_adapter.rs | 15 +- .../activitypub/src/watchlist_handler.rs | 61 +- .../adapters/postgres-federation/Cargo.toml | 18 +- .../adapters/postgres-federation/src/lib.rs | 708 ++++++------------ .../postgres/migrations/0020_kap_v03.sql | 12 + .../migrations/0021_user_display_name.sql | 1 + crates/adapters/postgres/src/ap_content.rs | 51 ++ crates/adapters/postgres/src/users.rs | 18 +- crates/adapters/sqlite-federation/Cargo.toml | 3 +- crates/adapters/sqlite-federation/src/lib.rs | 525 +++++++------ .../sqlite/migrations/0020_kap_v03.sql | 12 + .../migrations/0021_user_display_name.sql | 1 + crates/adapters/sqlite/src/ap_content.rs | 45 ++ crates/adapters/sqlite/src/users.rs | 151 ++-- crates/application/src/commands.rs | 1 + .../src/use_cases/update_profile.rs | 1 + crates/domain/src/models/mod.rs | 9 + crates/domain/src/ports.rs | 8 + crates/presentation/src/handlers/api.rs | 7 + crates/presentation/src/handlers/html.rs | 57 ++ crates/presentation/src/main.rs | 7 +- crates/presentation/src/routes.rs | 8 + crates/worker/src/follow_backfill_handler.rs | 2 +- crates/worker/src/main.rs | 7 +- 32 files changed, 1016 insertions(+), 957 deletions(-) create mode 100644 crates/adapters/activitypub/src/federation_event_bridge.rs create mode 100644 crates/adapters/postgres/migrations/0020_kap_v03.sql create mode 100644 crates/adapters/postgres/migrations/0021_user_display_name.sql create mode 100644 crates/adapters/sqlite/migrations/0020_kap_v03.sql create mode 100644 crates/adapters/sqlite/migrations/0021_user_display_name.sql diff --git a/Cargo.lock b/Cargo.lock index 13fa2aa..e18d8ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2770,8 +2770,9 @@ dependencies = [ [[package]] name = "k-ap" -version = "0.1.10" -source = "git+https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git?tag=v0.1.10#d80cfd0431205498161db8665fd884710866ca95" +version = "0.3.1" +source = "sparse+https://git.gabrielkaszewski.dev/api/packages/GKaszewski/cargo/" +checksum = "f73de37ac4feab6d7b78e73c60acbb07933c2be58dcbb12e8a34201f66e0480d" dependencies = [ "activitypub_federation", "anyhow", @@ -2787,6 +2788,7 @@ dependencies = [ "tracing", "url", "uuid", + "zeroize", ] [[package]] @@ -3749,6 +3751,7 @@ dependencies = [ "chrono", "domain", "k-ap", + "serde_json", "sqlx", "tracing", "uuid", @@ -5014,6 +5017,7 @@ dependencies = [ "chrono", "domain", "k-ap", + "serde_json", "sqlx", "tokio", "tracing", @@ -7060,6 +7064,20 @@ name = "zeroize" version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] [[package]] name = "zerotrie" diff --git a/crates/adapters/activitypub/Cargo.toml b/crates/adapters/activitypub/Cargo.toml index 180eb9f..b5c4e3d 100644 --- a/crates/adapters/activitypub/Cargo.toml +++ b/crates/adapters/activitypub/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] -k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" } +k-ap = { version = "0.3.1", registry = "gitea" } domain = { workspace = true } axum = { workspace = true } serde = { workspace = true } diff --git a/crates/adapters/activitypub/src/composite_handler.rs b/crates/adapters/activitypub/src/composite_handler.rs index 0399ace..0863738 100644 --- a/crates/adapters/activitypub/src/composite_handler.rs +++ b/crates/adapters/activitypub/src/composite_handler.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use k_ap::ApObjectHandler; +use k_ap::{ApContentReader, ApObjectHandler}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use url::Url; @@ -13,16 +13,7 @@ pub struct CompositeObjectHandler { } #[async_trait] -impl ApObjectHandler for CompositeObjectHandler { - async fn get_local_objects_for_user( - &self, - user_id: uuid::Uuid, - ) -> anyhow::Result> { - let mut results = self.review.get_local_objects_for_user(user_id).await?; - results.extend(self.watchlist.get_local_objects_for_user(user_id).await?); - Ok(results) - } - +impl ApContentReader for CompositeObjectHandler { async fn get_local_objects_page( &self, user_id: uuid::Uuid, @@ -34,6 +25,13 @@ impl ApObjectHandler for CompositeObjectHandler { .await } + async fn count_local_posts(&self) -> anyhow::Result { + self.review.count_local_posts().await + } +} + +#[async_trait] +impl ApObjectHandler for CompositeObjectHandler { async fn on_create( &self, ap_id: &Url, @@ -77,10 +75,6 @@ impl ApObjectHandler for CompositeObjectHandler { Ok(()) } - async fn count_local_posts(&self) -> anyhow::Result { - self.review.count_local_posts().await - } - async fn on_like(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { Ok(()) } @@ -89,6 +83,10 @@ impl ApObjectHandler for CompositeObjectHandler { Ok(()) } + async fn on_announce_of_remote(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { + Ok(()) + } + async fn on_unlike(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { Ok(()) } diff --git a/crates/adapters/activitypub/src/event_handler.rs b/crates/adapters/activitypub/src/event_handler.rs index 1f89eb1..322184a 100644 --- a/crates/adapters/activitypub/src/event_handler.rs +++ b/crates/adapters/activitypub/src/event_handler.rs @@ -8,7 +8,7 @@ use domain::{ }; use std::sync::Arc; -use k_ap::ActivityPubService; +use k_ap::{ActivityPubService, ApVisibility}; use crate::objects::review_to_ap_object; use crate::urls::{actor_url, review_url}; @@ -170,7 +170,7 @@ impl ActivityPubEventHandler { let json = serde_json::to_value(obj)?; self.ap_service - .broadcast_update_note(user_id.value(), json) + .broadcast_update_note(user_id.value(), json, ApVisibility::Public, vec![]) .await?; Ok(()) @@ -227,7 +227,7 @@ impl ActivityPubEventHandler { let json = serde_json::to_value(obj)?; self.ap_service - .broadcast_create_note(user_id.value(), json) + .broadcast_create_note(user_id.value(), json, ApVisibility::Public, vec![]) .await?; Ok(()) } diff --git a/crates/adapters/activitypub/src/federation_event_bridge.rs b/crates/adapters/activitypub/src/federation_event_bridge.rs new file mode 100644 index 0000000..6a47f2a --- /dev/null +++ b/crates/adapters/activitypub/src/federation_event_bridge.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; + +use domain::events::DomainEvent; +use domain::value_objects::UserId; +use k_ap::FederationEvent; + +pub struct FederationEventBridge { + domain_publisher: Arc, +} + +impl FederationEventBridge { + pub fn new(domain_publisher: Arc) -> Self { + Self { domain_publisher } + } +} + +#[async_trait::async_trait] +impl k_ap::EventPublisher for FederationEventBridge { + async fn publish(&self, event: FederationEvent) -> anyhow::Result<()> { + match event { + FederationEvent::BackfillRequested { + owner_user_id, + follower_inbox_url, + } => { + tracing::info!( + owner = %owner_user_id, + inbox = %follower_inbox_url, + "federation BackfillRequested → FollowAccepted" + ); + self.domain_publisher + .publish(&DomainEvent::FollowAccepted { + local_user_id: UserId::from_uuid(owner_user_id), + remote_actor_url: follower_inbox_url.clone(), + outbox_url: follower_inbox_url, + }) + .await + .map_err(|e| anyhow::anyhow!(e.to_string())) + } + _ => { + tracing::debug!("ignoring federation event: {:?}", event); + Ok(()) + } + } + } +} diff --git a/crates/adapters/activitypub/src/lib.rs b/crates/adapters/activitypub/src/lib.rs index 9dd9168..e011882 100644 --- a/crates/adapters/activitypub/src/lib.rs +++ b/crates/adapters/activitypub/src/lib.rs @@ -1,5 +1,6 @@ pub mod composite_handler; pub mod event_handler; +pub mod federation_event_bridge; pub mod objects; pub mod port; pub mod remote_review_repository; @@ -10,8 +11,9 @@ pub mod watchlist_handler; // Re-export the generic base types that callers need pub use k_ap::{ - ActivityPubService, ApFederationConfig, ApObjectHandler, ApUser, ApUserRepository, - FederationData, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, + ActivityPubService, ActivityRepository, ActorRepository, ApContentReader, ApFederationConfig, + ApObjectHandler, ApUser, ApUserRepository, BlocklistRepository, FederationData, + FollowRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, }; pub use event_handler::ActivityPubEventHandler; @@ -27,14 +29,17 @@ pub struct ActivityPubWire { } pub async fn wire( - federation_repo: std::sync::Arc, + activity_repo: std::sync::Arc, + follow_repo: std::sync::Arc, + actor_repo: std::sync::Arc, + blocklist_repo: std::sync::Arc, review_store: std::sync::Arc, remote_watchlist_repo: std::sync::Arc, local_ap_content: std::sync::Arc, user_repo: std::sync::Arc, base_url: String, allow_registration: bool, - _event_publisher: std::sync::Arc, + event_publisher: std::sync::Arc, ) -> anyhow::Result { let review_handler = std::sync::Arc::new(ReviewObjectHandler { content_query: std::sync::Arc::clone(&local_ap_content), @@ -62,18 +67,28 @@ pub async fn wire( ); } + let fed_event_bridge = std::sync::Arc::new( + federation_event_bridge::FederationEventBridge::new(event_publisher), + ); + let concrete = std::sync::Arc::new( - ActivityPubService::builder( - federation_repo, - std::sync::Arc::new(DomainUserRepoAdapter::new(user_repo, base_url.clone())), - composite, - base_url.clone(), - ) - .allow_registration(allow_registration) - .software_name("movies-diary") - .debug(federation_debug) - .build() - .await?, + ActivityPubService::builder(base_url.clone()) + .activity_repo(activity_repo) + .follow_repo(follow_repo) + .actor_repo(actor_repo) + .blocklist_repo(blocklist_repo) + .user_repo(std::sync::Arc::new(DomainUserRepoAdapter::new( + user_repo, + base_url.clone(), + ))) + .content_reader(composite.clone() as std::sync::Arc) + .object_handler(composite as std::sync::Arc) + .event_publisher(fed_event_bridge) + .allow_registration(allow_registration) + .software_name("movies-diary") + .debug(federation_debug) + .build() + .await?, ); let router = concrete.router(); diff --git a/crates/adapters/activitypub/src/port.rs b/crates/adapters/activitypub/src/port.rs index deb0141..bcbca40 100644 --- a/crates/adapters/activitypub/src/port.rs +++ b/crates/adapters/activitypub/src/port.rs @@ -31,7 +31,9 @@ pub trait ActivityPubPort: Send + Sync { async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> anyhow::Result<()>; async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()>; async fn get_blocked_domains(&self) -> anyhow::Result>; - async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()>; + async fn import_remote_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()>; + async fn followers_collection_json(&self, user_id: Uuid, page: Option) -> anyhow::Result; + async fn following_collection_json(&self, user_id: Uuid, page: Option) -> anyhow::Result; } #[async_trait] @@ -98,8 +100,14 @@ impl ActivityPubPort for ActivityPubService { async fn get_blocked_domains(&self) -> anyhow::Result> { self.get_blocked_domains().await } - async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> { - self.backfill_outbox(outbox_url, actor_url).await + async fn import_remote_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> { + self.import_remote_outbox(outbox_url, actor_url).await + } + async fn followers_collection_json(&self, user_id: Uuid, page: Option) -> anyhow::Result { + self.followers_collection_json(user_id, page).await + } + async fn following_collection_json(&self, user_id: Uuid, page: Option) -> anyhow::Result { + self.following_collection_json(user_id, page).await } } @@ -158,7 +166,13 @@ impl ActivityPubPort for NoopActivityPubService { async fn get_blocked_domains(&self) -> anyhow::Result> { Ok(vec![]) } - async fn backfill_outbox(&self, _: &str, _: &str) -> anyhow::Result<()> { + async fn import_remote_outbox(&self, _: &str, _: &str) -> anyhow::Result<()> { Ok(()) } + async fn followers_collection_json(&self, _: Uuid, _: Option) -> anyhow::Result { + Ok(String::new()) + } + async fn following_collection_json(&self, _: Uuid, _: Option) -> anyhow::Result { + Ok(String::new()) + } } diff --git a/crates/adapters/activitypub/src/review_handler.rs b/crates/adapters/activitypub/src/review_handler.rs index 90cbd07..2054cf6 100644 --- a/crates/adapters/activitypub/src/review_handler.rs +++ b/crates/adapters/activitypub/src/review_handler.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use k_ap::ApObjectHandler; +use k_ap::{ApContentReader, ApObjectHandler}; use async_trait::async_trait; use domain::{ models::ReviewSource, @@ -20,43 +20,7 @@ pub struct ReviewObjectHandler { } #[async_trait] -impl ApObjectHandler for ReviewObjectHandler { - async fn get_local_objects_for_user( - &self, - user_id: uuid::Uuid, - ) -> anyhow::Result> { - let domain_user_id = UserId::from_uuid(user_id); - let entries = self - .content_query - .get_local_reviews_for_user(&domain_user_id) - .await - .map_err(|e| anyhow::anyhow!(e.to_string()))?; - - let actor = actor_url(&self.base_url, user_id); - let mut results = Vec::new(); - for entry in entries { - let review = entry.review(); - let movie = entry.movie(); - - let ap_id = review_url(&self.base_url, review.id()); - let poster_url = movie - .poster_path() - .map(|p| format!("{}/images/{}", self.base_url, p.value())); - - let obj = review_to_ap_object( - review, - ap_id.clone(), - actor.clone(), - movie.title().value().to_string(), - movie.release_year().value(), - poster_url, - &self.base_url, - ); - results.push((ap_id, serde_json::to_value(obj)?)); - } - Ok(results) - } - +impl ApContentReader for ReviewObjectHandler { async fn get_local_objects_page( &self, user_id: uuid::Uuid, @@ -64,9 +28,10 @@ impl ApObjectHandler for ReviewObjectHandler { limit: usize, ) -> anyhow::Result)>> { let domain_user_id = UserId::from_uuid(user_id); + let before_naive = before.map(|dt| dt.naive_utc()); let entries = self .content_query - .get_local_reviews_for_user(&domain_user_id) + .get_local_reviews_page(&domain_user_id, before_naive, limit) .await .map_err(|e| anyhow::anyhow!(e.to_string()))?; @@ -75,13 +40,6 @@ impl ApObjectHandler for ReviewObjectHandler { for entry in entries { let review = entry.review(); let published = chrono::DateTime::from_naive_utc_and_offset(*review.watched_at(), chrono::Utc); - - if let Some(cutoff) = before - && published >= cutoff - { - continue; - } - let movie = entry.movie(); let ap_id = review_url(&self.base_url, review.id()); let poster_url = movie @@ -98,14 +56,20 @@ impl ApObjectHandler for ReviewObjectHandler { &self.base_url, ); results.push((ap_id, serde_json::to_value(obj)?, published)); - - if results.len() >= limit { - break; - } } Ok(results) } + async fn count_local_posts(&self) -> anyhow::Result { + self.content_query + .count_local_posts() + .await + .map_err(|e| anyhow::anyhow!(e.to_string())) + } +} + +#[async_trait] +impl ApObjectHandler for ReviewObjectHandler { async fn on_create( &self, _ap_id: &Url, @@ -200,13 +164,6 @@ impl ApObjectHandler for ReviewObjectHandler { self.review_store.delete_by_actor(actor_url.as_str()).await } - async fn count_local_posts(&self) -> anyhow::Result { - self.content_query - .count_local_posts() - .await - .map_err(|e| anyhow::anyhow!(e.to_string())) - } - async fn on_like(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { Ok(()) } @@ -215,6 +172,10 @@ impl ApObjectHandler for ReviewObjectHandler { Ok(()) } + async fn on_announce_of_remote(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { + Ok(()) + } + async fn on_unlike(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { Ok(()) } diff --git a/crates/adapters/activitypub/src/user_adapter.rs b/crates/adapters/activitypub/src/user_adapter.rs index 276ab65..0bd7810 100644 --- a/crates/adapters/activitypub/src/user_adapter.rs +++ b/crates/adapters/activitypub/src/user_adapter.rs @@ -26,10 +26,14 @@ impl DomainUserRepoAdapter { ApUser { id: u.id().value(), username: u.username().value().to_string(), + display_name: u.display_name().map(|s| s.to_string()), bio: u.bio().map(|s| s.to_string()), avatar_url, banner_url, - also_known_as: u.also_known_as().map(|s| s.to_string()), + also_known_as: u + .also_known_as() + .map(|s| vec![s.to_string()]) + .unwrap_or_default(), profile_url, attachment: u .profile_fields() @@ -39,6 +43,15 @@ impl DomainUserRepoAdapter { value: f.value.clone(), }) .collect(), + manually_approves_followers: true, + discoverable: true, + actor_type: Default::default(), + featured_url: Url::parse(&format!( + "{}/users/{}/featured", + self.base_url, + u.id().value() + )) + .ok(), } } } diff --git a/crates/adapters/activitypub/src/watchlist_handler.rs b/crates/adapters/activitypub/src/watchlist_handler.rs index 9ac843d..45c8421 100644 --- a/crates/adapters/activitypub/src/watchlist_handler.rs +++ b/crates/adapters/activitypub/src/watchlist_handler.rs @@ -2,15 +2,13 @@ use std::sync::Arc; use k_ap::ApObjectHandler; use async_trait::async_trait; -use chrono::Utc; use domain::{ models::RemoteWatchlistEntry, ports::{LocalApContentQuery, RemoteWatchlistRepository}, - value_objects::UserId, }; use url::Url; -use crate::{objects::{WatchlistObject, watchlist_to_ap_object}, urls::{actor_url, watchlist_entry_url}}; +use crate::objects::WatchlistObject; pub struct WatchlistObjectHandler { pub remote_watchlist_repo: Arc, @@ -20,55 +18,6 @@ pub struct WatchlistObjectHandler { #[async_trait] impl ApObjectHandler for WatchlistObjectHandler { - async fn get_local_objects_for_user( - &self, - user_id: uuid::Uuid, - ) -> anyhow::Result> { - let domain_user_id = UserId::from_uuid(user_id); - let entries = self - .content_query - .get_local_watchlist_for_user(&domain_user_id) - .await - .map_err(|e| anyhow::anyhow!(e.to_string()))?; - - let actor = actor_url(&self.base_url, user_id); - let mut results = Vec::new(); - for wm in entries { - let movie_id = wm.entry.movie_id.value(); - let ap_id = watchlist_entry_url(&self.base_url, user_id, movie_id); - let added_at = chrono::DateTime::from_naive_utc_and_offset(wm.entry.added_at, Utc); - let external_metadata_id = wm - .movie - .external_metadata_id() - .map(|id| id.value().to_string()); - let poster_url = wm - .movie - .poster_path() - .map(|p| format!("{}/images/{}", self.base_url, p.value())); - let obj = watchlist_to_ap_object( - ap_id.clone(), - actor.clone(), - wm.movie.title().value().to_string(), - wm.movie.release_year().value(), - external_metadata_id, - poster_url, - added_at, - &self.base_url, - ); - results.push((ap_id, serde_json::to_value(obj)?)); - } - Ok(results) - } - - async fn get_local_objects_page( - &self, - _user_id: uuid::Uuid, - _before: Option>, - _limit: usize, - ) -> anyhow::Result)>> { - Ok(vec![]) - } - async fn on_create( &self, ap_id: &Url, @@ -115,10 +64,6 @@ impl ApObjectHandler for WatchlistObjectHandler { Ok(()) } - async fn count_local_posts(&self) -> anyhow::Result { - Ok(0) - } - async fn on_like(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { Ok(()) } @@ -127,6 +72,10 @@ impl ApObjectHandler for WatchlistObjectHandler { Ok(()) } + async fn on_announce_of_remote(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { + Ok(()) + } + async fn on_unlike(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { Ok(()) } diff --git a/crates/adapters/postgres-federation/Cargo.toml b/crates/adapters/postgres-federation/Cargo.toml index f1e8600..28ab5f1 100644 --- a/crates/adapters/postgres-federation/Cargo.toml +++ b/crates/adapters/postgres-federation/Cargo.toml @@ -11,12 +11,12 @@ sqlx = { version = "0.8.6", features = [ "macros", "chrono", ] } -activitypub = { workspace = true } -k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" } -domain = { workspace = true } -uuid = { workspace = true } -chrono = { workspace = true } -tracing = { workspace = true } -async-trait = { workspace = true } -anyhow = { workspace = true } - +activitypub = { workspace = true } +k-ap = { version = "0.3.1", registry = "gitea" } +domain = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +tracing = { workspace = true } +async-trait = { workspace = true } +anyhow = { workspace = true } +serde_json = { workspace = true } diff --git a/crates/adapters/postgres-federation/src/lib.rs b/crates/adapters/postgres-federation/src/lib.rs index bd8aa79..085d6a1 100644 --- a/crates/adapters/postgres-federation/src/lib.rs +++ b/crates/adapters/postgres-federation/src/lib.rs @@ -5,7 +5,8 @@ use sqlx::{PgPool, Row}; use activitypub::RemoteReviewRepository; use k_ap::{ - BlockedDomain, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, + ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, FollowRepository, + Follower, FollowerStatus, FollowingStatus, RemoteActor, }; use domain::models::{RemoteWatchlistEntry, Review, ReviewSource}; use domain::ports::RemoteWatchlistRepository; @@ -40,8 +41,32 @@ fn str_to_status(s: &str) -> FollowerStatus { } } +fn pg_remote_actor(row: &sqlx::postgres::PgRow, url_col: &str) -> RemoteActor { + RemoteActor { + url: row.get(url_col), + handle: row.try_get("handle").unwrap_or_default(), + inbox_url: row.try_get("inbox_url").unwrap_or_default(), + shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), + display_name: row.try_get("display_name").ok().flatten(), + avatar_url: row.try_get("avatar_url").ok().flatten(), + outbox_url: row.try_get("outbox_url").ok().flatten(), + bio: row.try_get("bio").ok().flatten(), + banner_url: row.try_get("banner_url").ok().flatten(), + followers_url: row.try_get("followers_url").ok().flatten(), + following_url: row.try_get("following_url").ok().flatten(), + also_known_as: row + .try_get::, _>("also_known_as") + .ok() + .flatten() + .map(|s| serde_json::from_str(&s).unwrap_or_default()) + .unwrap_or_default(), + } +} + +const PG_ACTOR_COLS: &str = "a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url, a.outbox_url, a.bio, a.banner_url, a.followers_url, a.following_url, a.also_known_as"; + #[async_trait] -impl FederationRepository for PostgresFederationRepository { +impl FollowRepository for PostgresFederationRepository { async fn add_follower( &self, local_user_id: uuid::Uuid, @@ -86,11 +111,7 @@ impl FederationRepository for PostgresFederationRepository { Ok(row) } - async fn remove_follower( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> Result<()> { + async fn remove_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result<()> { let uid = local_user_id.to_string(); sqlx::query("DELETE FROM ap_followers WHERE local_user_id = $1 AND remote_actor_url = $2") .bind(&uid) @@ -102,573 +123,326 @@ impl FederationRepository for PostgresFederationRepository { async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result> { let uid = local_user_id.to_string(); - let rows = sqlx::query( - "SELECT f.remote_actor_url, f.status, - a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url - FROM ap_followers f - LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url - WHERE f.local_user_id = $1", - ) - .bind(&uid) - .fetch_all(&self.pool) - .await?; - Ok(rows - .into_iter() - .map(|row| { - let url: String = row.get("remote_actor_url"); - let status_str: String = row.get("status"); - let handle: String = row.try_get("handle").unwrap_or_default(); - let inbox_url: String = row.try_get("inbox_url").unwrap_or_default(); - let shared_inbox_url: Option = - row.try_get("shared_inbox_url").ok().flatten(); - let display_name: Option = row.try_get("display_name").ok().flatten(); - let avatar_url: Option = row.try_get("avatar_url").ok().flatten(); - Follower { - actor: RemoteActor { - url, - handle, - inbox_url, - shared_inbox_url, - display_name, - avatar_url, - outbox_url: row.try_get("outbox_url").ok().flatten(), - }, - status: str_to_status(&status_str), - } - }) - .collect()) + let q = format!( + "SELECT f.remote_actor_url, f.status, {PG_ACTOR_COLS} + FROM ap_followers f LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = $1" + ); + let rows = sqlx::query(&q).bind(&uid).fetch_all(&self.pool).await?; + Ok(rows.iter().map(|row| { + let status_str: String = row.get("status"); + Follower { actor: pg_remote_actor(row, "remote_actor_url"), status: str_to_status(&status_str) } + }).collect()) } - async fn get_followers_page( - &self, - local_user_id: uuid::Uuid, - offset: u32, - limit: usize, - ) -> Result> { + async fn get_followers_page(&self, local_user_id: uuid::Uuid, offset: u32, limit: usize) -> Result> { let uid = local_user_id.to_string(); - let limit_i64 = limit as i64; - let offset_i64 = offset as i64; - - let rows = sqlx::query( - "SELECT f.remote_actor_url, f.status, - a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url - FROM ap_followers f - LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url + let q = format!( + "SELECT f.remote_actor_url, f.status, {PG_ACTOR_COLS} + FROM ap_followers f LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url WHERE f.local_user_id = $1 AND f.status = 'accepted' - ORDER BY f.created_at ASC - LIMIT $2 OFFSET $3", - ) - .bind(&uid) - .bind(limit_i64) - .bind(offset_i64) - .fetch_all(&self.pool) - .await?; - - Ok(rows - .into_iter() - .map(|row| { - let url: String = row.get("remote_actor_url"); - let status_str: String = row.get("status"); - let handle: String = row.try_get("handle").unwrap_or_default(); - let inbox_url: String = row.try_get("inbox_url").unwrap_or_default(); - let shared_inbox_url: Option = - row.try_get("shared_inbox_url").ok().flatten(); - let display_name: Option = row.try_get("display_name").ok().flatten(); - let avatar_url: Option = row.try_get("avatar_url").ok().flatten(); - Follower { - actor: RemoteActor { - url, - handle, - inbox_url, - shared_inbox_url, - display_name, - avatar_url, - outbox_url: row.try_get("outbox_url").ok().flatten(), - }, - status: str_to_status(&status_str), - } - }) - .collect()) + ORDER BY f.created_at ASC LIMIT $2 OFFSET $3" + ); + let rows = sqlx::query(&q).bind(&uid).bind(limit as i64).bind(offset as i64).fetch_all(&self.pool).await?; + Ok(rows.iter().map(|row| { + let status_str: String = row.get("status"); + Follower { actor: pg_remote_actor(row, "remote_actor_url"), status: str_to_status(&status_str) } + }).collect()) } async fn count_followers(&self, local_user_id: uuid::Uuid) -> Result { let uid = local_user_id.to_string(); let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM ap_followers WHERE local_user_id = $1 AND status = 'accepted'", - ) - .bind(&uid) - .fetch_one(&self.pool) - .await?; + ).bind(&uid).fetch_one(&self.pool).await?; Ok(count as usize) } - async fn update_follower_status( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - status: FollowerStatus, - ) -> Result<()> { + async fn update_follower_status(&self, local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowerStatus) -> Result<()> { let uid = local_user_id.to_string(); let status_str = status_to_str(&status); let result = sqlx::query( "UPDATE ap_followers SET status = $1 WHERE local_user_id = $2 AND remote_actor_url = $3", - ) - .bind(status_str) - .bind(&uid) - .bind(remote_actor_url) - .execute(&self.pool) - .await?; + ).bind(status_str).bind(&uid).bind(remote_actor_url).execute(&self.pool).await?; if result.rows_affected() == 0 { tracing::warn!(local_user_id = %local_user_id, remote_actor_url, "update_follower_status: no row found"); } Ok(()) } - async fn add_following( - &self, - local_user_id: uuid::Uuid, - actor: RemoteActor, - follow_activity_id: &str, - ) -> Result<()> { + async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result> { + let uid = local_user_id.to_string(); + let q = format!( + "SELECT f.remote_actor_url, {PG_ACTOR_COLS} + FROM ap_followers f LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = $1 AND f.status = 'pending'" + ); + let rows = sqlx::query(&q).bind(&uid).fetch_all(&self.pool).await?; + Ok(rows.iter().map(|row| pg_remote_actor(row, "remote_actor_url")).collect()) + } + + async fn get_accepted_follower_inboxes(&self, local_user_id: uuid::Uuid) -> Result> { + let uid = local_user_id.to_string(); + let rows = sqlx::query( + "SELECT DISTINCT COALESCE(a.shared_inbox_url, a.inbox_url) as inbox + FROM ap_followers f + INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = $1 AND f.status = 'accepted' + AND f.remote_actor_url NOT IN ( + SELECT remote_actor_url FROM blocked_actors WHERE local_user_id = $1 + )", + ).bind(&uid).fetch_all(&self.pool).await?; + Ok(rows.iter().filter_map(|r| r.try_get::("inbox").ok()).collect()) + } + + async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> Result { + let uid = local_user_id.to_string(); + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM ap_followers WHERE local_user_id = $1 AND status = 'accepted'", + ).bind(&uid).fetch_one(&self.pool).await?; + Ok(count as usize) + } + + async fn get_accepted_followers_page(&self, local_user_id: uuid::Uuid, offset: u32, limit: usize) -> Result> { + let uid = local_user_id.to_string(); + let q = format!( + "SELECT f.remote_actor_url, {PG_ACTOR_COLS} + FROM ap_followers f LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = $1 AND f.status = 'accepted' + ORDER BY f.created_at ASC LIMIT $2 OFFSET $3" + ); + let rows = sqlx::query(&q).bind(&uid).bind(limit as i64).bind(offset as i64).fetch_all(&self.pool).await?; + Ok(rows.iter().map(|row| pg_remote_actor(row, "remote_actor_url")).collect()) + } + + async fn add_following(&self, local_user_id: uuid::Uuid, actor: RemoteActor, follow_activity_id: &str) -> Result<()> { let uid = local_user_id.to_string(); let now = Utc::now().naive_utc(); let created_at = datetime_to_str(&now); - self.upsert_remote_actor(actor.clone()).await?; + ActorRepository::upsert_remote_actor(self, actor.clone()).await?; sqlx::query( "INSERT INTO ap_following (local_user_id, remote_actor_url, follow_activity_id, created_at) - VALUES ($1, $2, $3, $4::timestamptz) - ON CONFLICT DO NOTHING", - ) - .bind(&uid) - .bind(&actor.url) - .bind(follow_activity_id) - .bind(&created_at) - .execute(&self.pool) - .await?; + VALUES ($1, $2, $3, $4::timestamptz) ON CONFLICT DO NOTHING", + ).bind(&uid).bind(&actor.url).bind(follow_activity_id).bind(&created_at).execute(&self.pool).await?; Ok(()) } - async fn get_follow_activity_id( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> Result> { + async fn get_follow_activity_id(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result> { let uid = local_user_id.to_string(); let row: Option = sqlx::query_scalar( "SELECT follow_activity_id FROM ap_following WHERE local_user_id = $1 AND remote_actor_url = $2", - ) - .bind(&uid) - .bind(remote_actor_url) - .fetch_optional(&self.pool) - .await?; + ).bind(&uid).bind(remote_actor_url).fetch_optional(&self.pool).await?; Ok(row) } async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { let uid = local_user_id.to_string(); sqlx::query("DELETE FROM ap_following WHERE local_user_id = $1 AND remote_actor_url = $2") - .bind(&uid) - .bind(actor_url) - .execute(&self.pool) - .await?; + .bind(&uid).bind(actor_url).execute(&self.pool).await?; Ok(()) } async fn get_following(&self, local_user_id: uuid::Uuid) -> Result> { let uid = local_user_id.to_string(); - let rows = sqlx::query( - "SELECT a.url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url - FROM ap_following f - INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url - WHERE f.local_user_id = $1 AND f.status = 'accepted'", - ) - .bind(&uid) - .fetch_all(&self.pool) - .await?; - Ok(rows - .into_iter() - .map(|row| RemoteActor { - url: row.get("url"), - handle: row.get("handle"), - inbox_url: row.get("inbox_url"), - shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), - display_name: row.try_get("display_name").ok().flatten(), - avatar_url: row.try_get("avatar_url").ok().flatten(), - outbox_url: row.try_get("outbox_url").ok().flatten(), - }) - .collect()) + let q = format!( + "SELECT a.url, {PG_ACTOR_COLS} + FROM ap_following f INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = $1 AND f.status = 'accepted'" + ); + let rows = sqlx::query(&q).bind(&uid).fetch_all(&self.pool).await?; + Ok(rows.iter().map(|row| pg_remote_actor(row, "url")).collect()) } async fn count_following(&self, local_user_id: uuid::Uuid) -> Result { let uid = local_user_id.to_string(); let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM ap_following WHERE local_user_id = $1 AND status = 'accepted'", - ) - .bind(&uid) - .fetch_one(&self.pool) - .await?; + ).bind(&uid).fetch_one(&self.pool).await?; Ok(count as usize) } - async fn get_following_page( - &self, - local_user_id: uuid::Uuid, - offset: u32, - limit: usize, - ) -> Result> { + async fn get_following_page(&self, local_user_id: uuid::Uuid, offset: u32, limit: usize) -> Result> { let uid = local_user_id.to_string(); - let limit_i64 = limit as i64; - let offset_i64 = offset as i64; - - let rows = sqlx::query( - "SELECT a.url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url - FROM ap_following f - INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url + let q = format!( + "SELECT a.url, {PG_ACTOR_COLS} + FROM ap_following f INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url WHERE f.local_user_id = $1 AND f.status = 'accepted' - ORDER BY f.created_at ASC - LIMIT $2 OFFSET $3", - ) - .bind(&uid) - .bind(limit_i64) - .bind(offset_i64) - .fetch_all(&self.pool) - .await?; - - Ok(rows - .into_iter() - .map(|row| RemoteActor { - url: row.get("url"), - handle: row.get("handle"), - inbox_url: row.get("inbox_url"), - shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), - display_name: row.try_get("display_name").ok().flatten(), - avatar_url: row.try_get("avatar_url").ok().flatten(), - outbox_url: row.try_get("outbox_url").ok().flatten(), - }) - .collect()) + ORDER BY f.created_at ASC LIMIT $2 OFFSET $3" + ); + let rows = sqlx::query(&q).bind(&uid).bind(limit as i64).bind(offset as i64).fetch_all(&self.pool).await?; + Ok(rows.iter().map(|row| pg_remote_actor(row, "url")).collect()) } - async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> { - let now = Utc::now().naive_utc(); - let fetched_at = datetime_to_str(&now); - sqlx::query( - "INSERT INTO ap_remote_actors (url, handle, inbox_url, shared_inbox_url, display_name, avatar_url, outbox_url, fetched_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8::timestamptz) - ON CONFLICT(url) DO UPDATE SET - handle = EXCLUDED.handle, - inbox_url = EXCLUDED.inbox_url, - shared_inbox_url = EXCLUDED.shared_inbox_url, - display_name = EXCLUDED.display_name, - avatar_url = EXCLUDED.avatar_url, - outbox_url = COALESCE(EXCLUDED.outbox_url, ap_remote_actors.outbox_url), - fetched_at = EXCLUDED.fetched_at", - ) - .bind(&actor.url) - .bind(&actor.handle) - .bind(&actor.inbox_url) - .bind(&actor.shared_inbox_url) - .bind(&actor.display_name) - .bind(&actor.avatar_url) - .bind(&actor.outbox_url) - .bind(&fetched_at) - .execute(&self.pool) - .await?; - Ok(()) - } - - async fn get_remote_actor(&self, actor_url: &str) -> Result> { - let row = sqlx::query( - "SELECT url, handle, inbox_url, shared_inbox_url, display_name, avatar_url - FROM ap_remote_actors WHERE url = $1", - ) - .bind(actor_url) - .fetch_optional(&self.pool) - .await?; - Ok(row.map(|row| RemoteActor { - url: row.get("url"), - handle: row.get("handle"), - inbox_url: row.get("inbox_url"), - shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), - display_name: row.try_get("display_name").ok().flatten(), - avatar_url: row.try_get("avatar_url").ok().flatten(), - outbox_url: row.try_get("outbox_url").ok().flatten(), - })) - } - - async fn get_local_actor_keypair( - &self, - user_id: uuid::Uuid, - ) -> Result> { - let uid = user_id.to_string(); - let row = - sqlx::query("SELECT public_key, private_key FROM ap_local_actors WHERE user_id = $1") - .bind(&uid) - .fetch_optional(&self.pool) - .await?; - Ok(row.map(|r| (r.get("public_key"), r.get("private_key")))) - } - - async fn save_local_actor_keypair( - &self, - user_id: uuid::Uuid, - public_key: String, - private_key: String, - ) -> Result<()> { - let uid = user_id.to_string(); - let now = Utc::now().naive_utc(); - let created_at = datetime_to_str(&now); - sqlx::query( - "INSERT INTO ap_local_actors (user_id, public_key, private_key, created_at) - VALUES ($1, $2, $3, $4::timestamptz) - ON CONFLICT(user_id) DO UPDATE SET - public_key = EXCLUDED.public_key, - private_key = EXCLUDED.private_key", - ) - .bind(&uid) - .bind(&public_key) - .bind(&private_key) - .bind(&created_at) - .execute(&self.pool) - .await?; - Ok(()) - } - - async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result> { + async fn update_following_status(&self, local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowingStatus) -> Result<()> { let uid = local_user_id.to_string(); - let rows = sqlx::query( - "SELECT f.remote_actor_url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url - FROM ap_followers f - LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url - WHERE f.local_user_id = $1 AND f.status = 'pending'", - ) - .bind(&uid) - .fetch_all(&self.pool) - .await?; - Ok(rows - .into_iter() - .map(|row| RemoteActor { - url: row.get("remote_actor_url"), - handle: row.try_get("handle").unwrap_or_default(), - inbox_url: row.try_get("inbox_url").unwrap_or_default(), - shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), - display_name: row.try_get("display_name").ok().flatten(), - avatar_url: row.try_get("avatar_url").ok().flatten(), - outbox_url: row.try_get("outbox_url").ok().flatten(), - }) - .collect()) - } - - async fn update_following_status( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - status: FollowingStatus, - ) -> Result<()> { - let uid = local_user_id.to_string(); - let status_str = match status { - FollowingStatus::Pending => "pending", - FollowingStatus::Accepted => "accepted", - }; + let status_str = match status { FollowingStatus::Pending => "pending", FollowingStatus::Accepted => "accepted" }; let result = sqlx::query( "UPDATE ap_following SET status = $1 WHERE local_user_id = $2 AND remote_actor_url = $3", - ) - .bind(status_str) - .bind(&uid) - .bind(remote_actor_url) - .execute(&self.pool) - .await?; + ).bind(status_str).bind(&uid).bind(remote_actor_url).execute(&self.pool).await?; if result.rows_affected() == 0 { tracing::warn!(local_user_id = %local_user_id, remote_actor_url, "update_following_status: no row found"); } Ok(()) } - async fn get_following_outbox_url( - &self, - local_user_id: uuid::Uuid, - remote_actor_url: &str, - ) -> Result> { + async fn get_following_outbox_url(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result> { let uid = local_user_id.to_string(); let row: Option> = sqlx::query_scalar( - "SELECT a.outbox_url - FROM ap_following f - INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url + "SELECT a.outbox_url FROM ap_following f INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url WHERE f.local_user_id = $1 AND f.remote_actor_url = $2", - ) - .bind(&uid) - .bind(remote_actor_url) - .fetch_optional(&self.pool) - .await?; + ).bind(&uid).bind(remote_actor_url).fetch_optional(&self.pool).await?; Ok(row.flatten()) } - async fn add_announce( - &self, - activity_id: &str, - object_url: &str, - actor_url: &str, - announced_at: chrono::DateTime, - ) -> Result<()> { - let ts = announced_at.format("%Y-%m-%d %H:%M:%S").to_string(); + async fn migrate_follower_actor(&self, old_actor_url: &str, new_actor_url: &str) -> Result> { + let candidates: Vec = sqlx::query_scalar( + "SELECT local_user_id FROM ap_following WHERE remote_actor_url = $1 + AND local_user_id NOT IN (SELECT local_user_id FROM ap_following WHERE remote_actor_url = $2)", + ).bind(old_actor_url).bind(new_actor_url).fetch_all(&self.pool).await?; + + if candidates.is_empty() { return Ok(vec![]); } + sqlx::query( - "INSERT INTO ap_announces (id, object_url, actor_url, announced_at) VALUES ($1, $2, $3, $4) ON CONFLICT (id) DO NOTHING", + "UPDATE ap_following SET remote_actor_url = $1 WHERE remote_actor_url = $2 + AND local_user_id NOT IN (SELECT local_user_id FROM ap_following WHERE remote_actor_url = $1)", + ).bind(new_actor_url).bind(old_actor_url).execute(&self.pool).await?; + + candidates.into_iter().map(|s| uuid::Uuid::parse_str(&s).map_err(|e| anyhow::anyhow!(e))).collect() + } +} + +#[async_trait] +impl ActorRepository for PostgresFederationRepository { + async fn get_local_actor_keypair(&self, user_id: uuid::Uuid) -> Result> { + let uid = user_id.to_string(); + let row = sqlx::query("SELECT public_key, private_key FROM ap_local_actors WHERE user_id = $1") + .bind(&uid).fetch_optional(&self.pool).await?; + Ok(row.map(|r| (r.get("public_key"), r.get("private_key")))) + } + + async fn save_local_actor_keypair(&self, user_id: uuid::Uuid, public_key: String, private_key: String) -> Result<()> { + let uid = user_id.to_string(); + let now = Utc::now().naive_utc(); + let created_at = datetime_to_str(&now); + sqlx::query( + "INSERT INTO ap_local_actors (user_id, public_key, private_key, created_at) + VALUES ($1, $2, $3, $4::timestamptz) + ON CONFLICT(user_id) DO UPDATE SET public_key = EXCLUDED.public_key, private_key = EXCLUDED.private_key", + ).bind(&uid).bind(&public_key).bind(&private_key).bind(&created_at).execute(&self.pool).await?; + Ok(()) + } + + async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> { + let now = Utc::now().naive_utc(); + let fetched_at = datetime_to_str(&now); + let aka_json = serde_json::to_string(&actor.also_known_as).unwrap_or_default(); + sqlx::query( + "INSERT INTO ap_remote_actors (url, handle, inbox_url, shared_inbox_url, display_name, avatar_url, outbox_url, bio, banner_url, followers_url, following_url, also_known_as, fetched_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13::timestamptz) + ON CONFLICT(url) DO UPDATE SET + handle=EXCLUDED.handle, inbox_url=EXCLUDED.inbox_url, shared_inbox_url=EXCLUDED.shared_inbox_url, + display_name=EXCLUDED.display_name, avatar_url=EXCLUDED.avatar_url, + outbox_url=COALESCE(EXCLUDED.outbox_url, ap_remote_actors.outbox_url), + bio=EXCLUDED.bio, banner_url=EXCLUDED.banner_url, followers_url=EXCLUDED.followers_url, + following_url=EXCLUDED.following_url, also_known_as=EXCLUDED.also_known_as, fetched_at=EXCLUDED.fetched_at", ) - .bind(activity_id) - .bind(object_url) - .bind(actor_url) - .bind(&ts) - .execute(&self.pool) - .await?; + .bind(&actor.url).bind(&actor.handle).bind(&actor.inbox_url).bind(&actor.shared_inbox_url) + .bind(&actor.display_name).bind(&actor.avatar_url).bind(&actor.outbox_url) + .bind(&actor.bio).bind(&actor.banner_url).bind(&actor.followers_url).bind(&actor.following_url) + .bind(&aka_json).bind(&fetched_at) + .execute(&self.pool).await?; + Ok(()) + } + + async fn get_remote_actor(&self, actor_url: &str) -> Result> { + let q = format!( + "SELECT url, {PG_ACTOR_COLS} FROM ap_remote_actors a WHERE url = $1" + ); + let row = sqlx::query(&q).bind(actor_url).fetch_optional(&self.pool).await?; + Ok(row.as_ref().map(|r| pg_remote_actor(r, "url"))) + } + + async fn add_announce(&self, activity_id: &str, object_url: &str, actor_url: &str, announced_at: chrono::DateTime) -> Result<()> { + let ts = announced_at.format("%Y-%m-%d %H:%M:%S").to_string(); + sqlx::query("INSERT INTO ap_announces (id, object_url, actor_url, announced_at) VALUES ($1, $2, $3, $4) ON CONFLICT (id) DO NOTHING") + .bind(activity_id).bind(object_url).bind(actor_url).bind(&ts).execute(&self.pool).await?; + Ok(()) + } + + async fn remove_announce(&self, activity_id: &str, actor_url: &str) -> Result<()> { + sqlx::query("DELETE FROM ap_announces WHERE id = $1 AND actor_url = $2") + .bind(activity_id).bind(actor_url).execute(&self.pool).await?; Ok(()) } async fn count_announces(&self, object_url: &str) -> Result { let row = sqlx::query("SELECT COUNT(*) as cnt FROM ap_announces WHERE object_url = $1") - .bind(object_url) - .fetch_one(&self.pool) - .await?; + .bind(object_url).fetch_one(&self.pool).await?; Ok(row.get::("cnt") as usize) } +} +#[async_trait] +impl BlocklistRepository for PostgresFederationRepository { async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> Result<()> { - let now = Utc::now().naive_utc(); - let ts = datetime_to_str(&now); - sqlx::query( - "INSERT INTO blocked_domains (domain, reason, blocked_at) VALUES ($1, $2, $3) - ON CONFLICT(domain) DO UPDATE SET reason = EXCLUDED.reason", - ) - .bind(domain) - .bind(reason) - .bind(&ts) - .execute(&self.pool) - .await?; + let ts = datetime_to_str(&Utc::now().naive_utc()); + sqlx::query("INSERT INTO blocked_domains (domain, reason, blocked_at) VALUES ($1, $2, $3) ON CONFLICT(domain) DO UPDATE SET reason = EXCLUDED.reason") + .bind(domain).bind(reason).bind(&ts).execute(&self.pool).await?; Ok(()) } - async fn remove_blocked_domain(&self, domain: &str) -> Result<()> { - sqlx::query("DELETE FROM blocked_domains WHERE domain = $1") - .bind(domain) - .execute(&self.pool) - .await?; + sqlx::query("DELETE FROM blocked_domains WHERE domain = $1").bind(domain).execute(&self.pool).await?; Ok(()) } - async fn get_blocked_domains(&self) -> Result> { - let rows = sqlx::query( - "SELECT domain, reason, blocked_at FROM blocked_domains ORDER BY blocked_at DESC", - ) - .fetch_all(&self.pool) - .await?; - Ok(rows - .iter() - .map(|r| BlockedDomain { - domain: r.get("domain"), - reason: r.get("reason"), - blocked_at: r.get("blocked_at"), - }) - .collect()) + let rows = sqlx::query("SELECT domain, reason, blocked_at FROM blocked_domains ORDER BY blocked_at DESC") + .fetch_all(&self.pool).await?; + Ok(rows.iter().map(|r| BlockedDomain { domain: r.get("domain"), reason: r.get("reason"), blocked_at: r.get("blocked_at") }).collect()) } - async fn is_domain_blocked(&self, domain: &str) -> Result { - let count: i64 = - sqlx::query_scalar("SELECT COUNT(*) FROM blocked_domains WHERE domain = $1") - .bind(domain) - .fetch_one(&self.pool) - .await?; + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM blocked_domains WHERE domain = $1") + .bind(domain).fetch_one(&self.pool).await?; Ok(count > 0) } - async fn add_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { let uid = local_user_id.to_string(); let ts = datetime_to_str(&Utc::now().naive_utc()); - sqlx::query( - "INSERT INTO blocked_actors (local_user_id, remote_actor_url, blocked_at) - VALUES ($1, $2, $3) - ON CONFLICT DO NOTHING", - ) - .bind(&uid) - .bind(actor_url) - .bind(&ts) - .execute(&self.pool) - .await?; + sqlx::query("INSERT INTO blocked_actors (local_user_id, remote_actor_url, blocked_at) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING") + .bind(&uid).bind(actor_url).bind(&ts).execute(&self.pool).await?; Ok(()) } - async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { let uid = local_user_id.to_string(); - sqlx::query( - "DELETE FROM blocked_actors WHERE local_user_id = $1 AND remote_actor_url = $2", - ) - .bind(&uid) - .bind(actor_url) - .execute(&self.pool) - .await?; + sqlx::query("DELETE FROM blocked_actors WHERE local_user_id = $1 AND remote_actor_url = $2") + .bind(&uid).bind(actor_url).execute(&self.pool).await?; Ok(()) } - async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result> { let uid = local_user_id.to_string(); - let rows = sqlx::query( - "SELECT remote_actor_url FROM blocked_actors WHERE local_user_id = $1 ORDER BY blocked_at DESC", - ) - .bind(&uid) - .fetch_all(&self.pool) - .await?; - Ok(rows - .iter() - .map(|r| r.get::("remote_actor_url")) - .collect()) + let rows = sqlx::query("SELECT remote_actor_url FROM blocked_actors WHERE local_user_id = $1 ORDER BY blocked_at DESC") + .bind(&uid).fetch_all(&self.pool).await?; + Ok(rows.iter().map(|r| r.get::("remote_actor_url")).collect()) } - async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result { let uid = local_user_id.to_string(); - let count: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM blocked_actors WHERE local_user_id = $1 AND remote_actor_url = $2", - ) - .bind(&uid) - .bind(actor_url) - .fetch_one(&self.pool) - .await?; + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM blocked_actors WHERE local_user_id = $1 AND remote_actor_url = $2") + .bind(&uid).bind(actor_url).fetch_one(&self.pool).await?; Ok(count > 0) } +} - async fn migrate_follower_actor( - &self, - old_actor_url: &str, - new_actor_url: &str, - ) -> Result> { - let candidates: Vec = sqlx::query_scalar( - "SELECT local_user_id FROM ap_following - WHERE remote_actor_url = $1 - AND local_user_id NOT IN ( - SELECT local_user_id FROM ap_following WHERE remote_actor_url = $2 - )", - ) - .bind(old_actor_url) - .bind(new_actor_url) - .fetch_all(&self.pool) - .await?; - - if candidates.is_empty() { - return Ok(vec![]); - } - - sqlx::query( - "UPDATE ap_following SET remote_actor_url = $1 - WHERE remote_actor_url = $2 - AND local_user_id NOT IN ( - SELECT local_user_id FROM ap_following WHERE remote_actor_url = $1 - )", - ) - .bind(new_actor_url) - .bind(old_actor_url) - .execute(&self.pool) - .await?; - - candidates - .into_iter() - .map(|s| uuid::Uuid::parse_str(&s).map_err(|e| anyhow::anyhow!(e))) - .collect() +#[async_trait] +impl ActivityRepository for PostgresFederationRepository { + async fn is_activity_processed(&self, activity_id: &str) -> Result { + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM ap_activities WHERE id = $1") + .bind(activity_id).fetch_one(&self.pool).await?; + Ok(count > 0) + } + async fn mark_activity_processed(&self, activity_id: &str) -> Result<()> { + let ts = datetime_to_str(&Utc::now().naive_utc()); + sqlx::query("INSERT INTO ap_activities (id, processed_at) VALUES ($1, $2) ON CONFLICT DO NOTHING") + .bind(activity_id).bind(&ts).execute(&self.pool).await?; + Ok(()) } } @@ -916,13 +690,19 @@ impl RemoteWatchlistRepository for PostgresFederationRepository { pub fn wire( pool: sqlx::PgPool, ) -> ( - std::sync::Arc, + std::sync::Arc, + std::sync::Arc, + std::sync::Arc, + std::sync::Arc, std::sync::Arc, std::sync::Arc, std::sync::Arc, ) { let fed = std::sync::Arc::new(PostgresFederationRepository::new(pool)); ( + std::sync::Arc::clone(&fed) as _, + std::sync::Arc::clone(&fed) as _, + std::sync::Arc::clone(&fed) as _, std::sync::Arc::clone(&fed) as _, std::sync::Arc::clone(&fed) as _, std::sync::Arc::clone(&fed) as _, diff --git a/crates/adapters/postgres/migrations/0020_kap_v03.sql b/crates/adapters/postgres/migrations/0020_kap_v03.sql new file mode 100644 index 0000000..fbbb423 --- /dev/null +++ b/crates/adapters/postgres/migrations/0020_kap_v03.sql @@ -0,0 +1,12 @@ +-- k-ap 0.3: new RemoteActor fields + activity dedup table + +ALTER TABLE ap_remote_actors ADD COLUMN IF NOT EXISTS bio TEXT; +ALTER TABLE ap_remote_actors ADD COLUMN IF NOT EXISTS banner_url TEXT; +ALTER TABLE ap_remote_actors ADD COLUMN IF NOT EXISTS followers_url TEXT; +ALTER TABLE ap_remote_actors ADD COLUMN IF NOT EXISTS following_url TEXT; +ALTER TABLE ap_remote_actors ADD COLUMN IF NOT EXISTS also_known_as TEXT; -- JSON array + +CREATE TABLE IF NOT EXISTS ap_activities ( + id TEXT PRIMARY KEY, + processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); diff --git a/crates/adapters/postgres/migrations/0021_user_display_name.sql b/crates/adapters/postgres/migrations/0021_user_display_name.sql new file mode 100644 index 0000000..00efce8 --- /dev/null +++ b/crates/adapters/postgres/migrations/0021_user_display_name.sql @@ -0,0 +1 @@ +ALTER TABLE users ADD COLUMN IF NOT EXISTS display_name TEXT; diff --git a/crates/adapters/postgres/src/ap_content.rs b/crates/adapters/postgres/src/ap_content.rs index a5fc5ae..aae0a2a 100644 --- a/crates/adapters/postgres/src/ap_content.rs +++ b/crates/adapters/postgres/src/ap_content.rs @@ -145,4 +145,55 @@ impl LocalApContentQuery for PostgresApContentQuery { .map_err(Self::map_err)?; Ok(count as u64) } + + async fn get_local_reviews_page( + &self, + user_id: &UserId, + before: Option, + limit: usize, + ) -> Result, DomainError> { + let uid = user_id.value().to_string(); + let limit_i64 = limit as i64; + + let rows = if let Some(before_ts) = before { + let ts = before_ts.format("%Y-%m-%d %H:%M:%S").to_string(); + sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1 AND r.remote_actor_url IS NULL AND r.watched_at < $2::timestamptz + ORDER BY r.watched_at DESC + LIMIT $3", + ) + .bind(&uid) + .bind(&ts) + .bind(limit_i64) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)? + } else { + sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1 AND r.remote_actor_url IS NULL + ORDER BY r.watched_at DESC + LIMIT $2", + ) + .bind(&uid) + .bind(limit_i64) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)? + }; + rows.into_iter().map(DiaryRow::into_domain).collect() + } } diff --git a/crates/adapters/postgres/src/users.rs b/crates/adapters/postgres/src/users.rs index a5716ff..371be0e 100644 --- a/crates/adapters/postgres/src/users.rs +++ b/crates/adapters/postgres/src/users.rs @@ -38,6 +38,7 @@ impl PostgresUserRepository { username_str: String, hash_str: String, role: UserRole, + display_name: Option, bio: Option, avatar_path: Option, banner_path: Option, @@ -58,6 +59,7 @@ impl PostgresUserRepository { username, hash, role, + display_name, bio, avatar_path, banner_path, @@ -78,13 +80,14 @@ impl UserRepository for PostgresUserRepository { username: String, password_hash: String, role: String, + display_name: Option, bio: Option, avatar_path: Option, banner_path: Option, also_known_as: Option, } let row = sqlx::query_as::<_, Row>( - "SELECT id, email, username, password_hash, role, bio, avatar_path, banner_path, also_known_as FROM users WHERE email = $1", + "SELECT id, email, username, password_hash, role, display_name, bio, avatar_path, banner_path, also_known_as FROM users WHERE email = $1", ) .bind(email_str) .fetch_optional(&self.pool) @@ -97,6 +100,7 @@ impl UserRepository for PostgresUserRepository { r.username, r.password_hash, Self::parse_role(&r.role), + r.display_name, r.bio, r.avatar_path, r.banner_path, @@ -116,13 +120,14 @@ impl UserRepository for PostgresUserRepository { username: String, password_hash: String, role: String, + display_name: Option, bio: Option, avatar_path: Option, banner_path: Option, also_known_as: Option, } let row = sqlx::query_as::<_, Row>( - "SELECT id, email, username, password_hash, role, bio, avatar_path, banner_path, also_known_as FROM users WHERE username = $1", + "SELECT id, email, username, password_hash, role, display_name, bio, avatar_path, banner_path, also_known_as FROM users WHERE username = $1", ) .bind(username_str) .fetch_optional(&self.pool) @@ -135,6 +140,7 @@ impl UserRepository for PostgresUserRepository { r.username, r.password_hash, Self::parse_role(&r.role), + r.display_name, r.bio, r.avatar_path, r.banner_path, @@ -192,13 +198,14 @@ impl UserRepository for PostgresUserRepository { username: String, password_hash: String, role: String, + display_name: Option, bio: Option, avatar_path: Option, banner_path: Option, also_known_as: Option, } let row = sqlx::query_as::<_, Row>( - "SELECT id, email, username, password_hash, role, bio, avatar_path, banner_path, also_known_as FROM users WHERE id = $1", + "SELECT id, email, username, password_hash, role, display_name, bio, avatar_path, banner_path, also_known_as FROM users WHERE id = $1", ) .bind(&id_str) .fetch_optional(&self.pool) @@ -234,6 +241,7 @@ impl UserRepository for PostgresUserRepository { r.username, r.password_hash, Self::parse_role(&r.role), + r.display_name, r.bio, r.avatar_path, r.banner_path, @@ -246,6 +254,7 @@ impl UserRepository for PostgresUserRepository { async fn update_profile( &self, user_id: &UserId, + display_name: Option, bio: Option, avatar_path: Option, banner_path: Option, @@ -253,8 +262,9 @@ impl UserRepository for PostgresUserRepository { ) -> Result<(), DomainError> { let id_str = user_id.value().to_string(); sqlx::query( - "UPDATE users SET bio = $1, avatar_path = $2, banner_path = $3, also_known_as = $4 WHERE id = $5", + "UPDATE users SET display_name = $1, bio = $2, avatar_path = $3, banner_path = $4, also_known_as = $5 WHERE id = $6", ) + .bind(&display_name) .bind(&bio) .bind(&avatar_path) .bind(&banner_path) diff --git a/crates/adapters/sqlite-federation/Cargo.toml b/crates/adapters/sqlite-federation/Cargo.toml index 12b2af8..02289e8 100644 --- a/crates/adapters/sqlite-federation/Cargo.toml +++ b/crates/adapters/sqlite-federation/Cargo.toml @@ -6,9 +6,10 @@ edition = "2024" [dependencies] sqlx = { workspace = true } activitypub = { workspace = true } -k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" } +k-ap = { version = "0.3.1", registry = "gitea" } domain = { workspace = true } anyhow = { workspace = true } +serde_json = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } tracing = { workspace = true } diff --git a/crates/adapters/sqlite-federation/src/lib.rs b/crates/adapters/sqlite-federation/src/lib.rs index 8bdbb07..dd00272 100644 --- a/crates/adapters/sqlite-federation/src/lib.rs +++ b/crates/adapters/sqlite-federation/src/lib.rs @@ -5,7 +5,8 @@ use sqlx::{Row, SqlitePool}; use activitypub::RemoteReviewRepository; use k_ap::{ - BlockedDomain, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, + ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, FollowRepository, + Follower, FollowerStatus, FollowingStatus, RemoteActor, }; use domain::models::{RemoteWatchlistEntry, Review, ReviewSource}; use domain::ports::RemoteWatchlistRepository; @@ -40,8 +41,30 @@ fn str_to_status(s: &str) -> FollowerStatus { } } +fn remote_actor_from_row(row: &sqlx::sqlite::SqliteRow, url_col: &str) -> RemoteActor { + RemoteActor { + url: row.get(url_col), + handle: row.try_get("handle").unwrap_or_default(), + inbox_url: row.try_get("inbox_url").unwrap_or_default(), + shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), + display_name: row.try_get("display_name").ok().flatten(), + avatar_url: row.try_get("avatar_url").ok().flatten(), + outbox_url: row.try_get("outbox_url").ok().flatten(), + bio: row.try_get("bio").ok().flatten(), + banner_url: row.try_get("banner_url").ok().flatten(), + followers_url: row.try_get("followers_url").ok().flatten(), + following_url: row.try_get("following_url").ok().flatten(), + also_known_as: row + .try_get::, _>("also_known_as") + .ok() + .flatten() + .map(|s| serde_json::from_str(&s).unwrap_or_default()) + .unwrap_or_default(), + } +} + #[async_trait] -impl FederationRepository for SqliteFederationRepository { +impl FollowRepository for SqliteFederationRepository { async fn add_follower( &self, local_user_id: uuid::Uuid, @@ -107,7 +130,8 @@ impl FederationRepository for SqliteFederationRepository { let rows = sqlx::query( "SELECT f.remote_actor_url, f.status, - a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url + a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url, + a.outbox_url, a.bio, a.banner_url, a.followers_url, a.following_url, a.also_known_as FROM ap_followers f LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url WHERE f.local_user_id = ?", @@ -116,34 +140,16 @@ impl FederationRepository for SqliteFederationRepository { .fetch_all(&self.pool) .await?; - let followers = rows - .into_iter() + Ok(rows + .iter() .map(|row| { - let url: String = row.get("remote_actor_url"); let status_str: String = row.get("status"); - let handle: String = row.try_get("handle").unwrap_or_default(); - let inbox_url: String = row.try_get("inbox_url").unwrap_or_default(); - let shared_inbox_url: Option = - row.try_get("shared_inbox_url").ok().flatten(); - let display_name: Option = row.try_get("display_name").ok().flatten(); - let avatar_url: Option = row.try_get("avatar_url").ok().flatten(); - Follower { - actor: RemoteActor { - url, - handle, - inbox_url, - shared_inbox_url, - display_name, - avatar_url, - outbox_url: row.try_get("outbox_url").ok().flatten(), - }, + actor: remote_actor_from_row(row, "remote_actor_url"), status: str_to_status(&status_str), } }) - .collect(); - - Ok(followers) + .collect()) } async fn get_followers_page( @@ -158,7 +164,8 @@ impl FederationRepository for SqliteFederationRepository { let rows = sqlx::query( "SELECT f.remote_actor_url, f.status, - a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url + a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url, + a.outbox_url, a.bio, a.banner_url, a.followers_url, a.following_url, a.also_known_as FROM ap_followers f LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url WHERE f.local_user_id = ? AND f.status = 'accepted' @@ -172,26 +179,11 @@ impl FederationRepository for SqliteFederationRepository { .await?; Ok(rows - .into_iter() + .iter() .map(|row| { - let url: String = row.get("remote_actor_url"); let status_str: String = row.get("status"); - let handle: String = row.try_get("handle").unwrap_or_default(); - let inbox_url: String = row.try_get("inbox_url").unwrap_or_default(); - let shared_inbox_url: Option = - row.try_get("shared_inbox_url").ok().flatten(); - let display_name: Option = row.try_get("display_name").ok().flatten(); - let avatar_url: Option = row.try_get("avatar_url").ok().flatten(); Follower { - actor: RemoteActor { - url, - handle, - inbox_url, - shared_inbox_url, - display_name, - avatar_url, - outbox_url: row.try_get("outbox_url").ok().flatten(), - }, + actor: remote_actor_from_row(row, "remote_actor_url"), status: str_to_status(&status_str), } }) @@ -234,6 +226,86 @@ impl FederationRepository for SqliteFederationRepository { Ok(()) } + async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result> { + let uid = local_user_id.to_string(); + + let rows = sqlx::query( + "SELECT f.remote_actor_url, + a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url, + a.outbox_url, a.bio, a.banner_url, a.followers_url, a.following_url, a.also_known_as + FROM ap_followers f + LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = ? AND f.status = 'pending'", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await?; + + Ok(rows.iter().map(|row| remote_actor_from_row(row, "remote_actor_url")).collect()) + } + + async fn get_accepted_follower_inboxes( + &self, + local_user_id: uuid::Uuid, + ) -> Result> { + let uid = local_user_id.to_string(); + let rows = sqlx::query( + "SELECT DISTINCT COALESCE(a.shared_inbox_url, a.inbox_url) as inbox + FROM ap_followers f + INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = ? AND f.status = 'accepted' + AND f.remote_actor_url NOT IN ( + SELECT remote_actor_url FROM blocked_actors WHERE local_user_id = ? + )", + ) + .bind(&uid) + .bind(&uid) + .fetch_all(&self.pool) + .await?; + + Ok(rows.iter().filter_map(|r| r.try_get::("inbox").ok()).collect()) + } + + async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> Result { + let uid = local_user_id.to_string(); + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM ap_followers WHERE local_user_id = ? AND status = 'accepted'", + ) + .bind(&uid) + .fetch_one(&self.pool) + .await?; + Ok(count as usize) + } + + async fn get_accepted_followers_page( + &self, + local_user_id: uuid::Uuid, + offset: u32, + limit: usize, + ) -> Result> { + let uid = local_user_id.to_string(); + let limit_i64 = limit as i64; + let offset_i64 = offset as i64; + + let rows = sqlx::query( + "SELECT f.remote_actor_url, + a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url, + a.outbox_url, a.bio, a.banner_url, a.followers_url, a.following_url, a.also_known_as + FROM ap_followers f + LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = ? AND f.status = 'accepted' + ORDER BY f.created_at ASC + LIMIT ? OFFSET ?", + ) + .bind(&uid) + .bind(limit_i64) + .bind(offset_i64) + .fetch_all(&self.pool) + .await?; + + Ok(rows.iter().map(|row| remote_actor_from_row(row, "remote_actor_url")).collect()) + } + async fn add_following( &self, local_user_id: uuid::Uuid, @@ -244,7 +316,7 @@ impl FederationRepository for SqliteFederationRepository { let now = Utc::now().naive_utc(); let created_at = datetime_to_str(&now); - self.upsert_remote_actor(actor.clone()).await?; + ActorRepository::upsert_remote_actor(self, actor.clone()).await?; sqlx::query( "INSERT OR IGNORE INTO ap_following (local_user_id, remote_actor_url, follow_activity_id, created_at) @@ -290,7 +362,8 @@ impl FederationRepository for SqliteFederationRepository { let uid = local_user_id.to_string(); let rows = sqlx::query( - "SELECT a.url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url + "SELECT a.url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url, + a.outbox_url, a.bio, a.banner_url, a.followers_url, a.following_url, a.also_known_as FROM ap_following f INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url WHERE f.local_user_id = ? AND f.status = 'accepted'", @@ -299,18 +372,7 @@ impl FederationRepository for SqliteFederationRepository { .fetch_all(&self.pool) .await?; - Ok(rows - .into_iter() - .map(|row| RemoteActor { - url: row.get("url"), - handle: row.get("handle"), - inbox_url: row.get("inbox_url"), - shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), - display_name: row.try_get("display_name").ok().flatten(), - avatar_url: row.try_get("avatar_url").ok().flatten(), - outbox_url: row.try_get("outbox_url").ok().flatten(), - }) - .collect()) + Ok(rows.iter().map(|row| remote_actor_from_row(row, "url")).collect()) } async fn count_following(&self, local_user_id: uuid::Uuid) -> Result { @@ -335,7 +397,8 @@ impl FederationRepository for SqliteFederationRepository { let offset_i64 = offset as i64; let rows = sqlx::query( - "SELECT a.url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url + "SELECT a.url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url, + a.outbox_url, a.bio, a.banner_url, a.followers_url, a.following_url, a.also_known_as FROM ap_following f INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url WHERE f.local_user_id = ? AND f.status = 'accepted' @@ -348,136 +411,7 @@ impl FederationRepository for SqliteFederationRepository { .fetch_all(&self.pool) .await?; - Ok(rows - .into_iter() - .map(|row| RemoteActor { - url: row.get("url"), - handle: row.get("handle"), - inbox_url: row.get("inbox_url"), - shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), - display_name: row.try_get("display_name").ok().flatten(), - avatar_url: row.try_get("avatar_url").ok().flatten(), - outbox_url: row.try_get("outbox_url").ok().flatten(), - }) - .collect()) - } - - async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> { - let now = Utc::now().naive_utc(); - let fetched_at = datetime_to_str(&now); - - sqlx::query( - "INSERT INTO ap_remote_actors (url, handle, inbox_url, shared_inbox_url, display_name, avatar_url, outbox_url, fetched_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(url) DO UPDATE SET - handle = excluded.handle, - inbox_url = excluded.inbox_url, - shared_inbox_url = excluded.shared_inbox_url, - display_name = excluded.display_name, - avatar_url = excluded.avatar_url, - outbox_url = COALESCE(excluded.outbox_url, ap_remote_actors.outbox_url), - fetched_at = excluded.fetched_at", - ) - .bind(&actor.url) - .bind(&actor.handle) - .bind(&actor.inbox_url) - .bind(&actor.shared_inbox_url) - .bind(&actor.display_name) - .bind(&actor.avatar_url) - .bind(&actor.outbox_url) - .bind(&fetched_at) - .execute(&self.pool) - .await?; - - Ok(()) - } - - async fn get_remote_actor(&self, actor_url: &str) -> Result> { - let row = sqlx::query( - "SELECT url, handle, inbox_url, shared_inbox_url, display_name, avatar_url - FROM ap_remote_actors WHERE url = ?", - ) - .bind(actor_url) - .fetch_optional(&self.pool) - .await?; - - Ok(row.map(|row| RemoteActor { - url: row.get("url"), - handle: row.get("handle"), - inbox_url: row.get("inbox_url"), - shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), - display_name: row.try_get("display_name").ok().flatten(), - avatar_url: row.try_get("avatar_url").ok().flatten(), - outbox_url: row.try_get("outbox_url").ok().flatten(), - })) - } - - async fn get_local_actor_keypair( - &self, - user_id: uuid::Uuid, - ) -> Result> { - let uid = user_id.to_string(); - let row = - sqlx::query("SELECT public_key, private_key FROM ap_local_actors WHERE user_id = ?") - .bind(&uid) - .fetch_optional(&self.pool) - .await?; - Ok(row.map(|r| (r.get("public_key"), r.get("private_key")))) - } - - async fn save_local_actor_keypair( - &self, - user_id: uuid::Uuid, - public_key: String, - private_key: String, - ) -> Result<()> { - let uid = user_id.to_string(); - let now = Utc::now().naive_utc(); - let created_at = datetime_to_str(&now); - - sqlx::query( - "INSERT INTO ap_local_actors (user_id, public_key, private_key, created_at) - VALUES (?, ?, ?, ?) - ON CONFLICT(user_id) DO UPDATE SET - public_key = excluded.public_key, - private_key = excluded.private_key", - ) - .bind(&uid) - .bind(&public_key) - .bind(&private_key) - .bind(&created_at) - .execute(&self.pool) - .await?; - - Ok(()) - } - - async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result> { - let uid = local_user_id.to_string(); - - let rows = sqlx::query( - "SELECT f.remote_actor_url, - a.handle, a.inbox_url, a.shared_inbox_url, a.display_name, a.avatar_url - FROM ap_followers f - LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url - WHERE f.local_user_id = ? AND f.status = 'pending'", - ) - .bind(&uid) - .fetch_all(&self.pool) - .await?; - - Ok(rows - .into_iter() - .map(|row| RemoteActor { - url: row.get("remote_actor_url"), - handle: row.try_get("handle").unwrap_or_default(), - inbox_url: row.try_get("inbox_url").unwrap_or_default(), - shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), - display_name: row.try_get("display_name").ok().flatten(), - avatar_url: row.try_get("avatar_url").ok().flatten(), - outbox_url: row.try_get("outbox_url").ok().flatten(), - }) - .collect()) + Ok(rows.iter().map(|row| remote_actor_from_row(row, "url")).collect()) } async fn update_following_status( @@ -527,6 +461,142 @@ impl FederationRepository for SqliteFederationRepository { Ok(row.flatten()) } + async fn migrate_follower_actor( + &self, + old_actor_url: &str, + new_actor_url: &str, + ) -> Result> { + let candidates: Vec = sqlx::query_scalar( + "SELECT local_user_id FROM ap_following + WHERE remote_actor_url = ?1 + AND local_user_id NOT IN ( + SELECT local_user_id FROM ap_following WHERE remote_actor_url = ?2 + )", + ) + .bind(old_actor_url) + .bind(new_actor_url) + .fetch_all(&self.pool) + .await?; + + if candidates.is_empty() { + return Ok(vec![]); + } + + sqlx::query( + "UPDATE ap_following SET remote_actor_url = ?1 + WHERE remote_actor_url = ?2 + AND local_user_id NOT IN ( + SELECT local_user_id FROM ap_following WHERE remote_actor_url = ?1 + )", + ) + .bind(new_actor_url) + .bind(old_actor_url) + .execute(&self.pool) + .await?; + + candidates + .into_iter() + .map(|s| uuid::Uuid::parse_str(&s).map_err(|e| anyhow::anyhow!(e))) + .collect() + } +} + +#[async_trait] +impl ActorRepository for SqliteFederationRepository { + async fn get_local_actor_keypair( + &self, + user_id: uuid::Uuid, + ) -> Result> { + let uid = user_id.to_string(); + let row = + sqlx::query("SELECT public_key, private_key FROM ap_local_actors WHERE user_id = ?") + .bind(&uid) + .fetch_optional(&self.pool) + .await?; + Ok(row.map(|r| (r.get("public_key"), r.get("private_key")))) + } + + async fn save_local_actor_keypair( + &self, + user_id: uuid::Uuid, + public_key: String, + private_key: String, + ) -> Result<()> { + let uid = user_id.to_string(); + let now = Utc::now().naive_utc(); + let created_at = datetime_to_str(&now); + + sqlx::query( + "INSERT INTO ap_local_actors (user_id, public_key, private_key, created_at) + VALUES (?, ?, ?, ?) + ON CONFLICT(user_id) DO UPDATE SET + public_key = excluded.public_key, + private_key = excluded.private_key", + ) + .bind(&uid) + .bind(&public_key) + .bind(&private_key) + .bind(&created_at) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> { + let now = Utc::now().naive_utc(); + let fetched_at = datetime_to_str(&now); + let aka_json = serde_json::to_string(&actor.also_known_as).unwrap_or_default(); + + sqlx::query( + "INSERT INTO ap_remote_actors (url, handle, inbox_url, shared_inbox_url, display_name, avatar_url, outbox_url, bio, banner_url, followers_url, following_url, also_known_as, fetched_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(url) DO UPDATE SET + handle = excluded.handle, + inbox_url = excluded.inbox_url, + shared_inbox_url = excluded.shared_inbox_url, + display_name = excluded.display_name, + avatar_url = excluded.avatar_url, + outbox_url = COALESCE(excluded.outbox_url, ap_remote_actors.outbox_url), + bio = excluded.bio, + banner_url = excluded.banner_url, + followers_url = excluded.followers_url, + following_url = excluded.following_url, + also_known_as = excluded.also_known_as, + fetched_at = excluded.fetched_at", + ) + .bind(&actor.url) + .bind(&actor.handle) + .bind(&actor.inbox_url) + .bind(&actor.shared_inbox_url) + .bind(&actor.display_name) + .bind(&actor.avatar_url) + .bind(&actor.outbox_url) + .bind(&actor.bio) + .bind(&actor.banner_url) + .bind(&actor.followers_url) + .bind(&actor.following_url) + .bind(&aka_json) + .bind(&fetched_at) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn get_remote_actor(&self, actor_url: &str) -> Result> { + let row = sqlx::query( + "SELECT url, handle, inbox_url, shared_inbox_url, display_name, avatar_url, + outbox_url, bio, banner_url, followers_url, following_url, also_known_as + FROM ap_remote_actors WHERE url = ?", + ) + .bind(actor_url) + .fetch_optional(&self.pool) + .await?; + + Ok(row.as_ref().map(|r| remote_actor_from_row(r, "url"))) + } + async fn add_announce( &self, activity_id: &str, @@ -548,6 +618,15 @@ impl FederationRepository for SqliteFederationRepository { Ok(()) } + async fn remove_announce(&self, activity_id: &str, actor_url: &str) -> Result<()> { + sqlx::query("DELETE FROM ap_announces WHERE id = ?1 AND actor_url = ?2") + .bind(activity_id) + .bind(actor_url) + .execute(&self.pool) + .await?; + Ok(()) + } + async fn count_announces(&self, object_url: &str) -> Result { let row = sqlx::query("SELECT COUNT(*) as cnt FROM ap_announces WHERE object_url = ?1") .bind(object_url) @@ -555,7 +634,10 @@ impl FederationRepository for SqliteFederationRepository { .await?; Ok(row.get::("cnt") as usize) } +} +#[async_trait] +impl BlocklistRepository for SqliteFederationRepository { async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> Result<()> { let now = Utc::now().naive_utc(); let ts = datetime_to_str(&now); @@ -656,44 +738,29 @@ impl FederationRepository for SqliteFederationRepository { .await?; Ok(count > 0) } +} - async fn migrate_follower_actor( - &self, - old_actor_url: &str, - new_actor_url: &str, - ) -> Result> { - let candidates: Vec = sqlx::query_scalar( - "SELECT local_user_id FROM ap_following - WHERE remote_actor_url = ?1 - AND local_user_id NOT IN ( - SELECT local_user_id FROM ap_following WHERE remote_actor_url = ?2 - )", - ) - .bind(old_actor_url) - .bind(new_actor_url) - .fetch_all(&self.pool) - .await?; - - if candidates.is_empty() { - return Ok(vec![]); - } +#[async_trait] +impl ActivityRepository for SqliteFederationRepository { + async fn is_activity_processed(&self, activity_id: &str) -> Result { + let count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM ap_activities WHERE id = ?1") + .bind(activity_id) + .fetch_one(&self.pool) + .await?; + Ok(count > 0) + } + async fn mark_activity_processed(&self, activity_id: &str) -> Result<()> { + let ts = datetime_to_str(&Utc::now().naive_utc()); sqlx::query( - "UPDATE ap_following SET remote_actor_url = ?1 - WHERE remote_actor_url = ?2 - AND local_user_id NOT IN ( - SELECT local_user_id FROM ap_following WHERE remote_actor_url = ?1 - )", + "INSERT OR IGNORE INTO ap_activities (id, processed_at) VALUES (?1, ?2)", ) - .bind(new_actor_url) - .bind(old_actor_url) + .bind(activity_id) + .bind(&ts) .execute(&self.pool) .await?; - - candidates - .into_iter() - .map(|s| uuid::Uuid::parse_str(&s).map_err(|e| anyhow::anyhow!(e))) - .collect() + Ok(()) } } @@ -957,13 +1024,19 @@ impl RemoteWatchlistRepository for SqliteFederationRepository { pub fn wire( pool: sqlx::SqlitePool, ) -> ( - std::sync::Arc, + std::sync::Arc, + std::sync::Arc, + std::sync::Arc, + std::sync::Arc, std::sync::Arc, std::sync::Arc, std::sync::Arc, ) { let fed = std::sync::Arc::new(SqliteFederationRepository::new(pool)); ( + std::sync::Arc::clone(&fed) as _, + std::sync::Arc::clone(&fed) as _, + std::sync::Arc::clone(&fed) as _, std::sync::Arc::clone(&fed) as _, std::sync::Arc::clone(&fed) as _, std::sync::Arc::clone(&fed) as _, @@ -974,7 +1047,7 @@ pub fn wire( #[cfg(test)] mod outbox_url_tests { use super::*; - use k_ap::{FederationRepository, FollowingStatus, RemoteActor}; + use k_ap::{FollowRepository, FollowingStatus, RemoteActor}; async fn setup_pool() -> SqlitePool { let pool = SqlitePool::connect(":memory:").await.unwrap(); @@ -982,7 +1055,8 @@ mod outbox_url_tests { "CREATE TABLE ap_remote_actors ( url TEXT PRIMARY KEY, handle TEXT NOT NULL, inbox_url TEXT NOT NULL, shared_inbox_url TEXT, display_name TEXT, avatar_url TEXT, - outbox_url TEXT, fetched_at TEXT NOT NULL + outbox_url TEXT, bio TEXT, banner_url TEXT, followers_url TEXT, + following_url TEXT, also_known_as TEXT, fetched_at TEXT NOT NULL ); CREATE TABLE ap_following ( local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, @@ -1010,6 +1084,11 @@ mod outbox_url_tests { display_name: None, avatar_url: None, outbox_url: Some("https://remote.example/users/alice/outbox".to_string()), + bio: None, + banner_url: None, + followers_url: None, + following_url: None, + also_known_as: vec![], }; repo.add_following(local_user, actor, "https://local/activities/1") .await diff --git a/crates/adapters/sqlite/migrations/0020_kap_v03.sql b/crates/adapters/sqlite/migrations/0020_kap_v03.sql new file mode 100644 index 0000000..7d6a9c6 --- /dev/null +++ b/crates/adapters/sqlite/migrations/0020_kap_v03.sql @@ -0,0 +1,12 @@ +-- k-ap 0.3: new RemoteActor fields + activity dedup table + +ALTER TABLE ap_remote_actors ADD COLUMN bio TEXT; +ALTER TABLE ap_remote_actors ADD COLUMN banner_url TEXT; +ALTER TABLE ap_remote_actors ADD COLUMN followers_url TEXT; +ALTER TABLE ap_remote_actors ADD COLUMN following_url TEXT; +ALTER TABLE ap_remote_actors ADD COLUMN also_known_as TEXT; -- JSON array + +CREATE TABLE IF NOT EXISTS ap_activities ( + id TEXT PRIMARY KEY, + processed_at TEXT NOT NULL +); diff --git a/crates/adapters/sqlite/migrations/0021_user_display_name.sql b/crates/adapters/sqlite/migrations/0021_user_display_name.sql new file mode 100644 index 0000000..1fc49e0 --- /dev/null +++ b/crates/adapters/sqlite/migrations/0021_user_display_name.sql @@ -0,0 +1 @@ +ALTER TABLE users ADD COLUMN display_name TEXT; diff --git a/crates/adapters/sqlite/src/ap_content.rs b/crates/adapters/sqlite/src/ap_content.rs index 4011082..b8c6948 100644 --- a/crates/adapters/sqlite/src/ap_content.rs +++ b/crates/adapters/sqlite/src/ap_content.rs @@ -106,4 +106,49 @@ impl LocalApContentQuery for SqliteApContentQuery { .map_err(Self::map_err)?; Ok(count as u64) } + + async fn get_local_reviews_page( + &self, + user_id: &UserId, + before: Option, + limit: usize, + ) -> Result, DomainError> { + let uid = user_id.value().to_string(); + let limit_i64 = limit as i64; + + let rows = if let Some(before_ts) = before { + let ts = before_ts.format("%Y-%m-%d %H:%M:%S").to_string(); + sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = ? AND r.remote_actor_url IS NULL AND r.watched_at < ? + ORDER BY r.watched_at DESC + LIMIT ?", + ) + .bind(&uid) + .bind(&ts) + .bind(limit_i64) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)? + } else { + sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = ? AND r.remote_actor_url IS NULL + ORDER BY r.watched_at DESC + LIMIT ?", + ) + .bind(&uid) + .bind(limit_i64) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)? + }; + rows.into_iter().map(DiaryRow::into_domain).collect() + } } diff --git a/crates/adapters/sqlite/src/users.rs b/crates/adapters/sqlite/src/users.rs index c501077..fdac5e6 100644 --- a/crates/adapters/sqlite/src/users.rs +++ b/crates/adapters/sqlite/src/users.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use chrono::Utc; -use sqlx::SqlitePool; +use sqlx::{Row, SqlitePool}; use super::models::UserSummaryRow; use domain::{ @@ -31,105 +31,69 @@ impl SqliteUserRepository { } } - fn row_to_user( - id_str: String, - email_str: String, - username_str: String, - hash_str: String, - role: UserRole, - bio: Option, - avatar_path: Option, - banner_path: Option, - also_known_as: Option, - profile_fields: Vec, - ) -> Result { + fn row_to_user(row: &sqlx::sqlite::SqliteRow, profile_fields: Vec) -> Result { + let id_str: String = row.try_get("id").unwrap_or_default(); let id = uuid::Uuid::parse_str(&id_str) .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; - let email = - Email::new(email_str).map_err(|e| DomainError::InfrastructureError(e.to_string()))?; - let username = Username::new(username_str) + let email = Email::new(row.get("email")) .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; - let hash = PasswordHash::new(hash_str) + let username = Username::new(row.get("username")) .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + let hash = PasswordHash::new(row.get("password_hash")) + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + let role_str: String = row.get("role"); Ok(User::from_persistence( UserId::from_uuid(id), email, username, hash, - role, - bio, - avatar_path, - banner_path, - also_known_as, + Self::parse_role(&role_str), + row.try_get("display_name").ok().flatten(), + row.try_get("bio").ok().flatten(), + row.try_get("avatar_path").ok().flatten(), + row.try_get("banner_path").ok().flatten(), + row.try_get("also_known_as").ok().flatten(), profile_fields, )) } } +const USER_COLS: &str = "id, email, username, password_hash, role, display_name, bio, avatar_path, banner_path, also_known_as"; + #[async_trait] impl UserRepository for SqliteUserRepository { async fn find_by_email(&self, email: &Email) -> Result, DomainError> { let email_str = email.value(); - let row = sqlx::query!( - "SELECT id, email, username, password_hash, role, bio, avatar_path, banner_path, also_known_as FROM users WHERE email = ?", - email_str - ) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err)?; + let row = sqlx::query(&format!("SELECT {USER_COLS} FROM users WHERE email = ?")) + .bind(email_str) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)?; - row.map(|r| { - Self::row_to_user( - r.id.unwrap_or_default(), - r.email, - r.username, - r.password_hash, - Self::parse_role(&r.role), - r.bio, - r.avatar_path, - r.banner_path, - r.also_known_as, - vec![], - ) - }) - .transpose() + row.as_ref() + .map(|r| Self::row_to_user(r, vec![])) + .transpose() } async fn find_by_username(&self, username: &Username) -> Result, DomainError> { let username_str = username.value(); - let row = sqlx::query!( - "SELECT id, email, username, password_hash, role, bio, avatar_path, banner_path, also_known_as FROM users WHERE username = ?", - username_str - ) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err)?; + let row = sqlx::query(&format!("SELECT {USER_COLS} FROM users WHERE username = ?")) + .bind(username_str) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)?; - row.map(|r| { - Self::row_to_user( - r.id.unwrap_or_default(), - r.email, - r.username, - r.password_hash, - Self::parse_role(&r.role), - r.bio, - r.avatar_path, - r.banner_path, - r.also_known_as, - vec![], - ) - }) - .transpose() + row.as_ref() + .map(|r| Self::row_to_user(r, vec![])) + .transpose() } async fn save(&self, user: &User) -> Result<(), DomainError> { - // Check email uniqueness first (clearer error than INSERT OR IGNORE) if self.find_by_email(user.email()).await?.is_some() { return Err(DomainError::ValidationError( "Email already registered".into(), )); } - // Check username uniqueness if self.find_by_username(user.username()).await?.is_some() { return Err(DomainError::ValidationError( "Username already taken".into(), @@ -141,15 +105,20 @@ impl UserRepository for SqliteUserRepository { let username = user.username().value(); let hash = user.password_hash().value(); let created_at = Utc::now().to_rfc3339(); - let role = match user.role() { UserRole::Admin => "admin", UserRole::Standard => "standard", }; - sqlx::query!( + + sqlx::query( "INSERT INTO users (id, email, username, password_hash, created_at, role) VALUES (?, ?, ?, ?, ?, ?)", - id, email, username, hash, created_at, role ) + .bind(&id) + .bind(email) + .bind(username) + .bind(hash) + .bind(&created_at) + .bind(role) .execute(&self.pool) .await .map_err(Self::map_err)?; @@ -159,50 +128,37 @@ impl UserRepository for SqliteUserRepository { async fn find_by_id(&self, id: &UserId) -> Result, DomainError> { let id_str = id.value().to_string(); - let row = sqlx::query!( - "SELECT id, email, username, password_hash, role, bio, avatar_path, banner_path, also_known_as FROM users WHERE id = ?", - id_str - ) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err)?; + let row = sqlx::query(&format!("SELECT {USER_COLS} FROM users WHERE id = ?")) + .bind(&id_str) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)?; let Some(r) = row else { return Ok(None) }; - let field_rows = sqlx::query!( + let field_rows = sqlx::query( "SELECT name, value FROM user_profile_fields WHERE user_id = ? ORDER BY position ASC", - id_str ) + .bind(&id_str) .fetch_all(&self.pool) .await .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; let profile_fields = field_rows - .into_iter() + .iter() .map(|f| ProfileField { - name: f.name, - value: f.value, + name: f.get("name"), + value: f.get("value"), }) .collect(); - Self::row_to_user( - r.id.unwrap_or_default(), - r.email, - r.username, - r.password_hash, - Self::parse_role(&r.role), - r.bio, - r.avatar_path, - r.banner_path, - r.also_known_as, - profile_fields, - ) - .map(Some) + Self::row_to_user(&r, profile_fields).map(Some) } async fn update_profile( &self, user_id: &UserId, + display_name: Option, bio: Option, avatar_path: Option, banner_path: Option, @@ -210,8 +166,9 @@ impl UserRepository for SqliteUserRepository { ) -> Result<(), DomainError> { let id_str = user_id.value().to_string(); sqlx::query( - "UPDATE users SET bio = ?, avatar_path = ?, banner_path = ?, also_known_as = ? WHERE id = ?", + "UPDATE users SET display_name = ?, bio = ?, avatar_path = ?, banner_path = ?, also_known_as = ? WHERE id = ?", ) + .bind(&display_name) .bind(&bio) .bind(&avatar_path) .bind(&banner_path) diff --git a/crates/application/src/commands.rs b/crates/application/src/commands.rs index 9d5649e..c0e664d 100644 --- a/crates/application/src/commands.rs +++ b/crates/application/src/commands.rs @@ -75,6 +75,7 @@ pub struct DeleteImportProfileCommand { pub struct UpdateProfileCommand { pub user_id: Uuid, + pub display_name: Option, pub bio: Option, pub avatar_bytes: Option>, pub avatar_content_type: Option, diff --git a/crates/application/src/use_cases/update_profile.rs b/crates/application/src/use_cases/update_profile.rs index cf36d7f..a898bc9 100644 --- a/crates/application/src/use_cases/update_profile.rs +++ b/crates/application/src/use_cases/update_profile.rs @@ -68,6 +68,7 @@ pub async fn execute(ctx: &AppContext, cmd: UpdateProfileCommand) -> Result<(), ctx.user_repository .update_profile( &user_id, + cmd.display_name, cmd.bio, new_avatar_path, new_banner_path, diff --git a/crates/domain/src/models/mod.rs b/crates/domain/src/models/mod.rs index 87b9e05..ed21d4d 100644 --- a/crates/domain/src/models/mod.rs +++ b/crates/domain/src/models/mod.rs @@ -321,6 +321,7 @@ pub struct User { username: Username, password_hash: PasswordHash, role: UserRole, + display_name: Option, bio: Option, avatar_path: Option, banner_path: Option, @@ -341,6 +342,7 @@ impl User { username, password_hash, role, + display_name: None, bio: None, avatar_path: None, banner_path: None, @@ -355,6 +357,7 @@ impl User { username: Username, password_hash: PasswordHash, role: UserRole, + display_name: Option, bio: Option, avatar_path: Option, banner_path: Option, @@ -367,6 +370,7 @@ impl User { username, password_hash, role, + display_name, bio, avatar_path, banner_path, @@ -381,11 +385,13 @@ impl User { pub fn update_profile( &mut self, + display_name: Option, bio: Option, avatar_path: Option, banner_path: Option, also_known_as: Option, ) { + self.display_name = display_name; self.bio = bio; self.avatar_path = avatar_path; self.banner_path = banner_path; @@ -407,6 +413,9 @@ impl User { pub fn role(&self) -> &UserRole { &self.role } + pub fn display_name(&self) -> Option<&str> { + self.display_name.as_deref() + } pub fn bio(&self) -> Option<&str> { self.bio.as_deref() } diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index 1eb9132..39370b3 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -185,6 +185,7 @@ pub trait UserRepository: Send + Sync { async fn update_profile( &self, user_id: &UserId, + display_name: Option, bio: Option, avatar_path: Option, banner_path: Option, @@ -398,4 +399,11 @@ pub trait LocalApContentQuery: Send + Sync { async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError>; async fn count_local_posts(&self) -> Result; + + async fn get_local_reviews_page( + &self, + user_id: &UserId, + before: Option, + limit: usize, + ) -> Result, DomainError>; } diff --git a/crates/presentation/src/handlers/api.rs b/crates/presentation/src/handlers/api.rs index 39bbd6f..66f60a9 100644 --- a/crates/presentation/src/handlers/api.rs +++ b/crates/presentation/src/handlers/api.rs @@ -475,6 +475,7 @@ pub async fn update_profile_handler( AuthenticatedUser(user_id): AuthenticatedUser, mut multipart: Multipart, ) -> impl IntoResponse { + let mut display_name: Option = None; let mut bio: Option = None; let mut avatar_bytes: Option> = None; let mut avatar_content_type: Option = None; @@ -485,6 +486,11 @@ pub async fn update_profile_handler( while let Ok(Some(field)) = multipart.next_field().await { let name = field.name().unwrap_or("").to_string(); match name.as_str() { + "display_name" => { + if let Ok(text) = field.text().await { + display_name = Some(text).filter(|s| !s.is_empty()); + } + } "bio" => { if let Ok(text) = field.text().await { bio = Some(text); @@ -519,6 +525,7 @@ pub async fn update_profile_handler( let cmd = application::commands::UpdateProfileCommand { user_id: user_id.value(), + display_name, bio, avatar_bytes, avatar_content_type, diff --git a/crates/presentation/src/handlers/html.rs b/crates/presentation/src/handlers/html.rs index 107f88c..65f6b47 100644 --- a/crates/presentation/src/handlers/html.rs +++ b/crates/presentation/src/handlers/html.rs @@ -834,6 +834,56 @@ pub async fn reject_follower( } } +#[cfg(feature = "federation")] +pub async fn get_followers_collection( + State(state): State, + Path(user_id): Path, + headers: axum::http::HeaderMap, + Query(params): Query>, +) -> impl IntoResponse { + let accept = headers + .get(axum::http::header::ACCEPT) + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + if accept.contains("application/activity+json") || accept.contains("application/ld+json") { + let page = params.get("page").and_then(|p| p.parse::().ok()); + return match state.ap_service.followers_collection_json(user_id, page).await { + Ok(json) => ( + [(axum::http::header::CONTENT_TYPE, "application/activity+json")], + json, + ) + .into_response(), + Err(_) => StatusCode::NOT_FOUND.into_response(), + }; + } + axum::response::Redirect::to(&format!("/users/{}/followers-list", user_id)).into_response() +} + +#[cfg(feature = "federation")] +pub async fn get_following_collection( + State(state): State, + Path(user_id): Path, + headers: axum::http::HeaderMap, + Query(params): Query>, +) -> impl IntoResponse { + let accept = headers + .get(axum::http::header::ACCEPT) + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + if accept.contains("application/activity+json") || accept.contains("application/ld+json") { + let page = params.get("page").and_then(|p| p.parse::().ok()); + return match state.ap_service.following_collection_json(user_id, page).await { + Ok(json) => ( + [(axum::http::header::CONTENT_TYPE, "application/activity+json")], + json, + ) + .into_response(), + Err(_) => StatusCode::NOT_FOUND.into_response(), + }; + } + axum::response::Redirect::to(&format!("/users/{}/following-list", user_id)).into_response() +} + #[cfg(feature = "federation")] pub async fn get_following_page( RequiredCookieUser(user_id): RequiredCookieUser, @@ -1503,6 +1553,7 @@ pub async fn post_profile_settings( State(state): State, mut multipart: Multipart, ) -> impl IntoResponse { + let mut display_name: Option = None; let mut bio: Option = None; let mut avatar_bytes: Option> = None; let mut avatar_content_type: Option = None; @@ -1517,6 +1568,11 @@ pub async fn post_profile_settings( while let Ok(Some(field)) = multipart.next_field().await { let name = field.name().unwrap_or("").to_string(); match name.as_str() { + "display_name" => { + if let Ok(text) = field.text().await { + display_name = Some(text).filter(|s| !s.is_empty()); + } + } "bio" => { if let Ok(text) = field.text().await { bio = Some(text); @@ -1569,6 +1625,7 @@ pub async fn post_profile_settings( let cmd = application::commands::UpdateProfileCommand { user_id: user_id.value(), + display_name, bio, avatar_bytes, avatar_content_type, diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index 404f80a..649fe39 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -82,7 +82,7 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { #[cfg(feature = "federation")] let (event_publisher_arc, ap_router, ap_service, social_query, remote_watchlist_repo) = { - let (federation_repo, social_query_arc, review_store, remote_watchlist_repo) = + let (activity_repo, follow_repo, actor_repo, blocklist_repo, social_query_arc, review_store, remote_watchlist_repo) = match &db_pool { #[cfg(feature = "postgres-federation")] factory::DbPool::Postgres(pool) => postgres_federation::wire(pool.clone()), @@ -119,7 +119,10 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { }; let ap = activitypub::wire( - federation_repo, + activity_repo, + follow_repo, + actor_repo, + blocklist_repo, review_store, remote_watchlist_repo.clone(), Arc::clone(&ap_content_repo), diff --git a/crates/presentation/src/routes.rs b/crates/presentation/src/routes.rs index 5b40c5a..8e65925 100644 --- a/crates/presentation/src/routes.rs +++ b/crates/presentation/src/routes.rs @@ -166,6 +166,14 @@ fn federation_html_routes() -> Router { "/users/{id}/followers/reject", routing::post(handlers::html::reject_follower), ) + .route( + "/users/{id}/followers", + routing::get(handlers::html::get_followers_collection), + ) + .route( + "/users/{id}/following", + routing::get(handlers::html::get_following_collection), + ) .route( "/users/{id}/following-list", routing::get(handlers::html::get_following_page), diff --git a/crates/worker/src/follow_backfill_handler.rs b/crates/worker/src/follow_backfill_handler.rs index 18eff5d..3886da3 100644 --- a/crates/worker/src/follow_backfill_handler.rs +++ b/crates/worker/src/follow_backfill_handler.rs @@ -20,7 +20,7 @@ impl EventHandler for FollowBackfillHandler { }; tracing::info!(actor = %remote_actor_url, outbox = %outbox_url, "starting outbox backfill"); self.ap_service - .backfill_outbox(outbox_url, remote_actor_url) + .import_remote_outbox(outbox_url, remote_actor_url) .await .map_err(|e| DomainError::InfrastructureError(e.to_string())) } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 15f2a4a..1fd7782 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -60,7 +60,7 @@ async fn main() -> anyhow::Result<()> { ); // Wire federation repos early to get remote_watchlist_repo for AppContext. #[cfg(feature = "federation")] - let (fed_federation_repo, _fed_social_query, fed_review_store, fed_remote_watchlist_repo) = + let (fed_activity_repo, fed_follow_repo, fed_actor_repo, fed_blocklist_repo, _fed_social_query, fed_review_store, fed_remote_watchlist_repo) = match &db_pool { #[cfg(feature = "sqlite-federation")] db::DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()), @@ -195,7 +195,10 @@ async fn main() -> anyhow::Result<()> { #[cfg(feature = "federation")] { let ap_wire = activitypub::wire( - fed_federation_repo, + fed_activity_repo, + fed_follow_repo, + fed_actor_repo, + fed_blocklist_repo, fed_review_store, fed_remote_watchlist_repo, fed_ap_content,