From 68a939f6c464e0703c21a925a8ebf4a16130c489 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 29 May 2026 10:58:44 +0200 Subject: [PATCH] Refactor code for improved readability and consistency - Simplified error handling in `PostgresApContentQuery` and `SqliteApContentQuery` by aligning the formatting of `try_get` calls. - Removed unnecessary line breaks and improved formatting in various repository implementations for better readability. - Consolidated imports in `lib.rs` and `factory.rs` to maintain a cleaner structure. - Enhanced consistency in async function signatures across multiple files. - Updated test helpers and use cases to streamline code and improve clarity. - Refactored `InMemory` repositories to enhance readability by aligning method implementations. --- .../activitypub/src/composite_handler.rs | 21 +- .../adapters/activitypub/src/event_handler.rs | 10 +- .../src/federation_event_bridge.rs | 17 +- crates/adapters/activitypub/src/objects.rs | 2 +- crates/adapters/activitypub/src/port.rs | 39 ++- .../activitypub/src/review_handler.rs | 24 +- .../adapters/activitypub/src/user_adapter.rs | 2 +- .../activitypub/src/watchlist_handler.rs | 21 +- .../adapters/postgres-federation/src/lib.rs | 278 ++++++++++++++---- crates/adapters/postgres/src/ap_content.rs | 29 +- crates/adapters/sqlite-federation/src/lib.rs | 50 ++-- crates/adapters/sqlite/src/ap_content.rs | 5 +- crates/adapters/sqlite/src/users.rs | 5 +- crates/application/src/test_helpers.rs | 9 +- .../src/use_cases/add_to_watchlist.rs | 2 +- .../src/use_cases/delete_review.rs | 16 +- crates/application/src/use_cases/register.rs | 14 +- crates/domain/src/ports.rs | 5 +- crates/domain/src/testing.rs | 156 +++++++--- crates/presentation/src/factory.rs | 18 +- crates/presentation/src/handlers/html.rs | 22 +- crates/presentation/src/main.rs | 29 +- crates/worker/src/main.rs | 28 +- 23 files changed, 578 insertions(+), 224 deletions(-) diff --git a/crates/adapters/activitypub/src/composite_handler.rs b/crates/adapters/activitypub/src/composite_handler.rs index 0863738..6da92b6 100644 --- a/crates/adapters/activitypub/src/composite_handler.rs +++ b/crates/adapters/activitypub/src/composite_handler.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use k_ap::{ApContentReader, ApObjectHandler}; use async_trait::async_trait; use chrono::{DateTime, Utc}; +use k_ap::{ApContentReader, ApObjectHandler}; use url::Url; use crate::{review_handler::ReviewObjectHandler, watchlist_handler::WatchlistObjectHandler}; @@ -79,11 +79,19 @@ impl ApObjectHandler for CompositeObjectHandler { Ok(()) } - async fn on_announce_received(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { + async fn on_announce_received( + &self, + _object_url: &Url, + _actor_url: &Url, + ) -> anyhow::Result<()> { Ok(()) } - async fn on_announce_of_remote(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { + async fn on_announce_of_remote( + &self, + _object_url: &Url, + _actor_url: &Url, + ) -> anyhow::Result<()> { Ok(()) } @@ -91,7 +99,12 @@ impl ApObjectHandler for CompositeObjectHandler { Ok(()) } - async fn on_mention(&self, _thought_ap_id: &Url, _mentioned_user_uuid: uuid::Uuid, _actor_url: &Url) -> anyhow::Result<()> { + async fn on_mention( + &self, + _thought_ap_id: &Url, + _mentioned_user_uuid: uuid::Uuid, + _actor_url: &Url, + ) -> anyhow::Result<()> { Ok(()) } } diff --git a/crates/adapters/activitypub/src/event_handler.rs b/crates/adapters/activitypub/src/event_handler.rs index dc4d290..a103ac9 100644 --- a/crates/adapters/activitypub/src/event_handler.rs +++ b/crates/adapters/activitypub/src/event_handler.rs @@ -105,7 +105,10 @@ impl ActivityPubEventHandler { .as_ref() .map(|m| m.title().value().to_string()) .unwrap_or_else(|| "Unknown".to_string()); - let release_year = movie.as_ref().map(|m| m.release_year().value()).unwrap_or(0); + let release_year = movie + .as_ref() + .map(|m| m.release_year().value()) + .unwrap_or(0); let poster_url = movie .as_ref() .and_then(|m| m.poster_path()) @@ -152,7 +155,10 @@ impl ActivityPubEventHandler { .as_ref() .map(|m| m.title().value().to_string()) .unwrap_or_else(|| "Unknown".to_string()); - let release_year = movie.as_ref().map(|m| m.release_year().value()).unwrap_or(0); + let release_year = movie + .as_ref() + .map(|m| m.release_year().value()) + .unwrap_or(0); let poster_url = movie .as_ref() .and_then(|m| m.poster_path()) diff --git a/crates/adapters/activitypub/src/federation_event_bridge.rs b/crates/adapters/activitypub/src/federation_event_bridge.rs index 2f9ca53..c350be7 100644 --- a/crates/adapters/activitypub/src/federation_event_bridge.rs +++ b/crates/adapters/activitypub/src/federation_event_bridge.rs @@ -21,15 +21,14 @@ impl k_ap::EventPublisher for FederationEventBridge { FederationEvent::BackfillRequested { owner_user_id, follower_inbox_url, - } => { - self.domain_publisher - .publish(&DomainEvent::BackfillFollower { - owner_user_id: UserId::from_uuid(owner_user_id), - follower_inbox_url, - }) - .await - .map_err(|e| anyhow::anyhow!(e.to_string())) - } + } => self + .domain_publisher + .publish(&DomainEvent::BackfillFollower { + owner_user_id: UserId::from_uuid(owner_user_id), + follower_inbox_url, + }) + .await + .map_err(|e| anyhow::anyhow!(e.to_string())), _ => { tracing::debug!("ignoring federation event: {:?}", event); Ok(()) diff --git a/crates/adapters/activitypub/src/objects.rs b/crates/adapters/activitypub/src/objects.rs index 05dd9e1..b6f1889 100644 --- a/crates/adapters/activitypub/src/objects.rs +++ b/crates/adapters/activitypub/src/objects.rs @@ -1,6 +1,6 @@ +use chrono::{DateTime, Utc}; use k_ap::AS_PUBLIC; use k_ap::NoteType; -use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use url::Url; diff --git a/crates/adapters/activitypub/src/port.rs b/crates/adapters/activitypub/src/port.rs index 1271e94..a4e1cbb 100644 --- a/crates/adapters/activitypub/src/port.rs +++ b/crates/adapters/activitypub/src/port.rs @@ -32,9 +32,21 @@ pub trait ActivityPubPort: Send + Sync { async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()>; async fn get_blocked_domains(&self) -> anyhow::Result>; async fn import_remote_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()>; - async fn followers_collection_json(&self, user_id: Uuid, page: Option) -> anyhow::Result; - async fn following_collection_json(&self, user_id: Uuid, page: Option) -> anyhow::Result; - async fn run_backfill_for_follower(&self, owner_user_id: Uuid, follower_inbox_url: String) -> anyhow::Result<()>; + async fn followers_collection_json( + &self, + user_id: Uuid, + page: Option, + ) -> anyhow::Result; + async fn following_collection_json( + &self, + user_id: Uuid, + page: Option, + ) -> anyhow::Result; + async fn run_backfill_for_follower( + &self, + owner_user_id: Uuid, + follower_inbox_url: String, + ) -> anyhow::Result<()>; } #[async_trait] @@ -104,14 +116,27 @@ impl ActivityPubPort for ActivityPubService { async fn import_remote_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> { self.import_remote_outbox(outbox_url, actor_url).await } - async fn followers_collection_json(&self, user_id: Uuid, page: Option) -> anyhow::Result { + async fn followers_collection_json( + &self, + user_id: Uuid, + page: Option, + ) -> anyhow::Result { self.followers_collection_json(user_id, page).await } - async fn following_collection_json(&self, user_id: Uuid, page: Option) -> anyhow::Result { + async fn following_collection_json( + &self, + user_id: Uuid, + page: Option, + ) -> anyhow::Result { self.following_collection_json(user_id, page).await } - async fn run_backfill_for_follower(&self, owner_user_id: Uuid, follower_inbox_url: String) -> anyhow::Result<()> { - self.run_backfill_for_follower(owner_user_id, follower_inbox_url).await + async fn run_backfill_for_follower( + &self, + owner_user_id: Uuid, + follower_inbox_url: String, + ) -> anyhow::Result<()> { + self.run_backfill_for_follower(owner_user_id, follower_inbox_url) + .await } } diff --git a/crates/adapters/activitypub/src/review_handler.rs b/crates/adapters/activitypub/src/review_handler.rs index 2054cf6..f1142ca 100644 --- a/crates/adapters/activitypub/src/review_handler.rs +++ b/crates/adapters/activitypub/src/review_handler.rs @@ -1,12 +1,12 @@ use std::sync::Arc; -use k_ap::{ApContentReader, ApObjectHandler}; use async_trait::async_trait; use domain::{ models::ReviewSource, ports::LocalApContentQuery, value_objects::{Comment, MovieId, Rating, ReviewId, UserId}, }; +use k_ap::{ApContentReader, ApObjectHandler}; use url::Url; use crate::objects::{ReviewObject, review_to_ap_object}; @@ -39,7 +39,8 @@ impl ApContentReader for ReviewObjectHandler { let mut results = Vec::new(); for entry in entries { let review = entry.review(); - let published = chrono::DateTime::from_naive_utc_and_offset(*review.watched_at(), chrono::Utc); + let published = + chrono::DateTime::from_naive_utc_and_offset(*review.watched_at(), chrono::Utc); let movie = entry.movie(); let ap_id = review_url(&self.base_url, review.id()); let poster_url = movie @@ -168,11 +169,19 @@ impl ApObjectHandler for ReviewObjectHandler { Ok(()) } - async fn on_announce_received(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { + async fn on_announce_received( + &self, + _object_url: &Url, + _actor_url: &Url, + ) -> anyhow::Result<()> { Ok(()) } - async fn on_announce_of_remote(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { + async fn on_announce_of_remote( + &self, + _object_url: &Url, + _actor_url: &Url, + ) -> anyhow::Result<()> { Ok(()) } @@ -180,7 +189,12 @@ impl ApObjectHandler for ReviewObjectHandler { Ok(()) } - async fn on_mention(&self, _thought_ap_id: &Url, _mentioned_user_uuid: uuid::Uuid, _actor_url: &Url) -> anyhow::Result<()> { + async fn on_mention( + &self, + _thought_ap_id: &Url, + _mentioned_user_uuid: uuid::Uuid, + _actor_url: &Url, + ) -> anyhow::Result<()> { Ok(()) } } diff --git a/crates/adapters/activitypub/src/user_adapter.rs b/crates/adapters/activitypub/src/user_adapter.rs index 0bd7810..c335cd2 100644 --- a/crates/adapters/activitypub/src/user_adapter.rs +++ b/crates/adapters/activitypub/src/user_adapter.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use k_ap::{ApProfileField, ApUser, ApUserRepository}; use async_trait::async_trait; use domain::{ports::UserRepository, value_objects::UserId}; +use k_ap::{ApProfileField, ApUser, ApUserRepository}; use url::Url; pub struct DomainUserRepoAdapter { diff --git a/crates/adapters/activitypub/src/watchlist_handler.rs b/crates/adapters/activitypub/src/watchlist_handler.rs index 45c8421..5fdd87c 100644 --- a/crates/adapters/activitypub/src/watchlist_handler.rs +++ b/crates/adapters/activitypub/src/watchlist_handler.rs @@ -1,11 +1,11 @@ use std::sync::Arc; -use k_ap::ApObjectHandler; use async_trait::async_trait; use domain::{ models::RemoteWatchlistEntry, ports::{LocalApContentQuery, RemoteWatchlistRepository}, }; +use k_ap::ApObjectHandler; use url::Url; use crate::objects::WatchlistObject; @@ -68,11 +68,19 @@ impl ApObjectHandler for WatchlistObjectHandler { Ok(()) } - async fn on_announce_received(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { + async fn on_announce_received( + &self, + _object_url: &Url, + _actor_url: &Url, + ) -> anyhow::Result<()> { Ok(()) } - async fn on_announce_of_remote(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { + async fn on_announce_of_remote( + &self, + _object_url: &Url, + _actor_url: &Url, + ) -> anyhow::Result<()> { Ok(()) } @@ -80,7 +88,12 @@ impl ApObjectHandler for WatchlistObjectHandler { Ok(()) } - async fn on_mention(&self, _thought_ap_id: &Url, _mentioned_user_uuid: uuid::Uuid, _actor_url: &Url) -> anyhow::Result<()> { + async fn on_mention( + &self, + _thought_ap_id: &Url, + _mentioned_user_uuid: uuid::Uuid, + _actor_url: &Url, + ) -> anyhow::Result<()> { Ok(()) } } diff --git a/crates/adapters/postgres-federation/src/lib.rs b/crates/adapters/postgres-federation/src/lib.rs index 2bc4a3d..d237e5a 100644 --- a/crates/adapters/postgres-federation/src/lib.rs +++ b/crates/adapters/postgres-federation/src/lib.rs @@ -4,12 +4,12 @@ use chrono::{NaiveDateTime, Utc}; use sqlx::{PgPool, Row}; use activitypub::RemoteReviewRepository; +use domain::models::{RemoteWatchlistEntry, Review, ReviewSource}; +use domain::ports::RemoteWatchlistRepository; use k_ap::{ ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, FollowRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, }; -use domain::models::{RemoteWatchlistEntry, Review, ReviewSource}; -use domain::ports::RemoteWatchlistRepository; fn datetime_to_str(dt: &NaiveDateTime) -> String { dt.format("%Y-%m-%d %H:%M:%S").to_string() @@ -116,7 +116,11 @@ impl FollowRepository for PostgresFederationRepository { Ok(row) } - async fn remove_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result<()> { + async fn remove_follower( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result<()> { let uid = local_user_id.to_string(); sqlx::query("DELETE FROM ap_followers WHERE local_user_id = $1 AND remote_actor_url = $2") .bind(&uid) @@ -134,13 +138,24 @@ impl FollowRepository for PostgresFederationRepository { WHERE f.local_user_id = $1" ); let rows = sqlx::query(&q).bind(&uid).fetch_all(&self.pool).await?; - Ok(rows.iter().map(|row| { - let status_str: String = row.get("status"); - Follower { actor: pg_remote_actor(row, "remote_actor_url"), status: str_to_status(&status_str) } - }).collect()) + Ok(rows + .iter() + .map(|row| { + let status_str: String = row.get("status"); + Follower { + actor: pg_remote_actor(row, "remote_actor_url"), + status: str_to_status(&status_str), + } + }) + .collect()) } - async fn get_followers_page(&self, local_user_id: uuid::Uuid, offset: u32, limit: usize) -> Result> { + async fn get_followers_page( + &self, + local_user_id: uuid::Uuid, + offset: u32, + limit: usize, + ) -> Result> { let uid = local_user_id.to_string(); let q = format!( "SELECT f.remote_actor_url, f.status, {PG_ACTOR_COLS} @@ -148,22 +163,41 @@ impl FollowRepository for PostgresFederationRepository { WHERE f.local_user_id = $1 AND f.status = 'accepted' ORDER BY f.created_at ASC LIMIT $2 OFFSET $3" ); - let rows = sqlx::query(&q).bind(&uid).bind(limit as i64).bind(offset as i64).fetch_all(&self.pool).await?; - Ok(rows.iter().map(|row| { - let status_str: String = row.get("status"); - Follower { actor: pg_remote_actor(row, "remote_actor_url"), status: str_to_status(&status_str) } - }).collect()) + let rows = sqlx::query(&q) + .bind(&uid) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(&self.pool) + .await?; + Ok(rows + .iter() + .map(|row| { + let status_str: String = row.get("status"); + Follower { + actor: pg_remote_actor(row, "remote_actor_url"), + status: str_to_status(&status_str), + } + }) + .collect()) } async fn count_followers(&self, local_user_id: uuid::Uuid) -> Result { let uid = local_user_id.to_string(); let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM ap_followers WHERE local_user_id = $1 AND status = 'accepted'", - ).bind(&uid).fetch_one(&self.pool).await?; + ) + .bind(&uid) + .fetch_one(&self.pool) + .await?; Ok(count as usize) } - async fn update_follower_status(&self, local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowerStatus) -> Result<()> { + async fn update_follower_status( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowerStatus, + ) -> Result<()> { let uid = local_user_id.to_string(); let status_str = status_to_str(&status); let result = sqlx::query( @@ -183,10 +217,16 @@ impl FollowRepository for PostgresFederationRepository { WHERE f.local_user_id = $1 AND f.status = 'pending'" ); let rows = sqlx::query(&q).bind(&uid).fetch_all(&self.pool).await?; - Ok(rows.iter().map(|row| pg_remote_actor(row, "remote_actor_url")).collect()) + Ok(rows + .iter() + .map(|row| pg_remote_actor(row, "remote_actor_url")) + .collect()) } - async fn get_accepted_follower_inboxes(&self, local_user_id: uuid::Uuid) -> Result> { + async fn get_accepted_follower_inboxes( + &self, + local_user_id: uuid::Uuid, + ) -> Result> { let uid = local_user_id.to_string(); let rows = sqlx::query( "SELECT DISTINCT COALESCE(a.shared_inbox_url, a.inbox_url) as inbox @@ -196,19 +236,33 @@ impl FollowRepository for PostgresFederationRepository { AND f.remote_actor_url NOT IN ( SELECT remote_actor_url FROM blocked_actors WHERE local_user_id = $1 )", - ).bind(&uid).fetch_all(&self.pool).await?; - Ok(rows.iter().filter_map(|r| r.try_get::("inbox").ok()).collect()) + ) + .bind(&uid) + .fetch_all(&self.pool) + .await?; + Ok(rows + .iter() + .filter_map(|r| r.try_get::("inbox").ok()) + .collect()) } async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> Result { let uid = local_user_id.to_string(); let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM ap_followers WHERE local_user_id = $1 AND status = 'accepted'", - ).bind(&uid).fetch_one(&self.pool).await?; + ) + .bind(&uid) + .fetch_one(&self.pool) + .await?; Ok(count as usize) } - async fn get_accepted_followers_page(&self, local_user_id: uuid::Uuid, offset: u32, limit: usize) -> Result> { + async fn get_accepted_followers_page( + &self, + local_user_id: uuid::Uuid, + offset: u32, + limit: usize, + ) -> Result> { let uid = local_user_id.to_string(); let q = format!( "SELECT f.remote_actor_url, {PG_ACTOR_COLS} @@ -216,11 +270,24 @@ impl FollowRepository for PostgresFederationRepository { WHERE f.local_user_id = $1 AND f.status = 'accepted' ORDER BY f.created_at ASC LIMIT $2 OFFSET $3" ); - let rows = sqlx::query(&q).bind(&uid).bind(limit as i64).bind(offset as i64).fetch_all(&self.pool).await?; - Ok(rows.iter().map(|row| pg_remote_actor(row, "remote_actor_url")).collect()) + let rows = sqlx::query(&q) + .bind(&uid) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(&self.pool) + .await?; + Ok(rows + .iter() + .map(|row| pg_remote_actor(row, "remote_actor_url")) + .collect()) } - async fn add_following(&self, local_user_id: uuid::Uuid, actor: RemoteActor, follow_activity_id: &str) -> Result<()> { + async fn add_following( + &self, + local_user_id: uuid::Uuid, + actor: RemoteActor, + follow_activity_id: &str, + ) -> Result<()> { let uid = local_user_id.to_string(); let now = Utc::now().naive_utc(); let created_at = datetime_to_str(&now); @@ -232,7 +299,11 @@ impl FollowRepository for PostgresFederationRepository { Ok(()) } - async fn get_follow_activity_id(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result> { + async fn get_follow_activity_id( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result> { let uid = local_user_id.to_string(); let row: Option = sqlx::query_scalar( "SELECT follow_activity_id FROM ap_following WHERE local_user_id = $1 AND remote_actor_url = $2", @@ -243,7 +314,10 @@ impl FollowRepository for PostgresFederationRepository { async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { let uid = local_user_id.to_string(); sqlx::query("DELETE FROM ap_following WHERE local_user_id = $1 AND remote_actor_url = $2") - .bind(&uid).bind(actor_url).execute(&self.pool).await?; + .bind(&uid) + .bind(actor_url) + .execute(&self.pool) + .await?; Ok(()) } @@ -262,11 +336,19 @@ impl FollowRepository for PostgresFederationRepository { let uid = local_user_id.to_string(); let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM ap_following WHERE local_user_id = $1 AND status = 'accepted'", - ).bind(&uid).fetch_one(&self.pool).await?; + ) + .bind(&uid) + .fetch_one(&self.pool) + .await?; Ok(count as usize) } - async fn get_following_page(&self, local_user_id: uuid::Uuid, offset: u32, limit: usize) -> Result> { + async fn get_following_page( + &self, + local_user_id: uuid::Uuid, + offset: u32, + limit: usize, + ) -> Result> { let uid = local_user_id.to_string(); let q = format!( "SELECT a.url, {PG_ACTOR_COLS} @@ -274,13 +356,26 @@ impl FollowRepository for PostgresFederationRepository { WHERE f.local_user_id = $1 AND f.status = 'accepted' ORDER BY f.created_at ASC LIMIT $2 OFFSET $3" ); - let rows = sqlx::query(&q).bind(&uid).bind(limit as i64).bind(offset as i64).fetch_all(&self.pool).await?; + let rows = sqlx::query(&q) + .bind(&uid) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(&self.pool) + .await?; Ok(rows.iter().map(|row| pg_remote_actor(row, "url")).collect()) } - async fn update_following_status(&self, local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowingStatus) -> Result<()> { + async fn update_following_status( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowingStatus, + ) -> Result<()> { let uid = local_user_id.to_string(); - let status_str = match status { FollowingStatus::Pending => "pending", FollowingStatus::Accepted => "accepted" }; + let status_str = match status { + FollowingStatus::Pending => "pending", + FollowingStatus::Accepted => "accepted", + }; let result = sqlx::query( "UPDATE ap_following SET status = $1 WHERE local_user_id = $2 AND remote_actor_url = $3", ).bind(status_str).bind(&uid).bind(remote_actor_url).execute(&self.pool).await?; @@ -290,7 +385,11 @@ impl FollowRepository for PostgresFederationRepository { Ok(()) } - async fn get_following_outbox_url(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result> { + async fn get_following_outbox_url( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result> { let uid = local_user_id.to_string(); let row: Option> = sqlx::query_scalar( "SELECT a.outbox_url FROM ap_following f INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url @@ -299,33 +398,53 @@ impl FollowRepository for PostgresFederationRepository { Ok(row.flatten()) } - async fn migrate_follower_actor(&self, old_actor_url: &str, new_actor_url: &str) -> Result> { + async fn migrate_follower_actor( + &self, + old_actor_url: &str, + new_actor_url: &str, + ) -> Result> { let candidates: Vec = sqlx::query_scalar( "SELECT local_user_id FROM ap_following WHERE remote_actor_url = $1 AND local_user_id NOT IN (SELECT local_user_id FROM ap_following WHERE remote_actor_url = $2)", ).bind(old_actor_url).bind(new_actor_url).fetch_all(&self.pool).await?; - if candidates.is_empty() { return Ok(vec![]); } + if candidates.is_empty() { + return Ok(vec![]); + } sqlx::query( "UPDATE ap_following SET remote_actor_url = $1 WHERE remote_actor_url = $2 AND local_user_id NOT IN (SELECT local_user_id FROM ap_following WHERE remote_actor_url = $1)", ).bind(new_actor_url).bind(old_actor_url).execute(&self.pool).await?; - candidates.into_iter().map(|s| uuid::Uuid::parse_str(&s).map_err(|e| anyhow::anyhow!(e))).collect() + candidates + .into_iter() + .map(|s| uuid::Uuid::parse_str(&s).map_err(|e| anyhow::anyhow!(e))) + .collect() } } #[async_trait] impl ActorRepository for PostgresFederationRepository { - async fn get_local_actor_keypair(&self, user_id: uuid::Uuid) -> Result> { + async fn get_local_actor_keypair( + &self, + user_id: uuid::Uuid, + ) -> Result> { let uid = user_id.to_string(); - let row = sqlx::query("SELECT public_key, private_key FROM ap_local_actors WHERE user_id = $1") - .bind(&uid).fetch_optional(&self.pool).await?; + let row = + sqlx::query("SELECT public_key, private_key FROM ap_local_actors WHERE user_id = $1") + .bind(&uid) + .fetch_optional(&self.pool) + .await?; Ok(row.map(|r| (r.get("public_key"), r.get("private_key")))) } - async fn save_local_actor_keypair(&self, user_id: uuid::Uuid, public_key: String, private_key: String) -> Result<()> { + async fn save_local_actor_keypair( + &self, + user_id: uuid::Uuid, + public_key: String, + private_key: String, + ) -> Result<()> { let uid = user_id.to_string(); let now = Utc::now().naive_utc(); let created_at = datetime_to_str(&now); @@ -360,14 +479,21 @@ impl ActorRepository for PostgresFederationRepository { } async fn get_remote_actor(&self, actor_url: &str) -> Result> { - let q = format!( - "SELECT url, {PG_ACTOR_COLS} FROM ap_remote_actors a WHERE url = $1" - ); - let row = sqlx::query(&q).bind(actor_url).fetch_optional(&self.pool).await?; + let q = format!("SELECT url, {PG_ACTOR_COLS} FROM ap_remote_actors a WHERE url = $1"); + let row = sqlx::query(&q) + .bind(actor_url) + .fetch_optional(&self.pool) + .await?; Ok(row.as_ref().map(|r| pg_remote_actor(r, "url"))) } - async fn add_announce(&self, activity_id: &str, object_url: &str, actor_url: &str, announced_at: chrono::DateTime) -> Result<()> { + async fn add_announce( + &self, + activity_id: &str, + object_url: &str, + actor_url: &str, + announced_at: chrono::DateTime, + ) -> Result<()> { let ts = announced_at.format("%Y-%m-%d %H:%M:%S").to_string(); sqlx::query("INSERT INTO ap_announces (id, object_url, actor_url, announced_at) VALUES ($1, $2, $3, $4) ON CONFLICT (id) DO NOTHING") .bind(activity_id).bind(object_url).bind(actor_url).bind(&ts).execute(&self.pool).await?; @@ -376,13 +502,18 @@ impl ActorRepository for PostgresFederationRepository { async fn remove_announce(&self, activity_id: &str, actor_url: &str) -> Result<()> { sqlx::query("DELETE FROM ap_announces WHERE id = $1 AND actor_url = $2") - .bind(activity_id).bind(actor_url).execute(&self.pool).await?; + .bind(activity_id) + .bind(actor_url) + .execute(&self.pool) + .await?; Ok(()) } async fn count_announces(&self, object_url: &str) -> Result { let row = sqlx::query("SELECT COUNT(*) as cnt FROM ap_announces WHERE object_url = $1") - .bind(object_url).fetch_one(&self.pool).await?; + .bind(object_url) + .fetch_one(&self.pool) + .await?; Ok(row.get::("cnt") as usize) } } @@ -396,17 +527,33 @@ impl BlocklistRepository for PostgresFederationRepository { Ok(()) } async fn remove_blocked_domain(&self, domain: &str) -> Result<()> { - sqlx::query("DELETE FROM blocked_domains WHERE domain = $1").bind(domain).execute(&self.pool).await?; + sqlx::query("DELETE FROM blocked_domains WHERE domain = $1") + .bind(domain) + .execute(&self.pool) + .await?; Ok(()) } async fn get_blocked_domains(&self) -> Result> { - let rows = sqlx::query("SELECT domain, reason, blocked_at FROM blocked_domains ORDER BY blocked_at DESC") - .fetch_all(&self.pool).await?; - Ok(rows.iter().map(|r| BlockedDomain { domain: r.get("domain"), reason: r.get("reason"), blocked_at: r.get("blocked_at") }).collect()) + let rows = sqlx::query( + "SELECT domain, reason, blocked_at FROM blocked_domains ORDER BY blocked_at DESC", + ) + .fetch_all(&self.pool) + .await?; + Ok(rows + .iter() + .map(|r| BlockedDomain { + domain: r.get("domain"), + reason: r.get("reason"), + blocked_at: r.get("blocked_at"), + }) + .collect()) } async fn is_domain_blocked(&self, domain: &str) -> Result { - let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM blocked_domains WHERE domain = $1") - .bind(domain).fetch_one(&self.pool).await?; + let count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM blocked_domains WHERE domain = $1") + .bind(domain) + .fetch_one(&self.pool) + .await?; Ok(count > 0) } async fn add_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { @@ -418,15 +565,23 @@ impl BlocklistRepository for PostgresFederationRepository { } async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { let uid = local_user_id.to_string(); - sqlx::query("DELETE FROM blocked_actors WHERE local_user_id = $1 AND remote_actor_url = $2") - .bind(&uid).bind(actor_url).execute(&self.pool).await?; + sqlx::query( + "DELETE FROM blocked_actors WHERE local_user_id = $1 AND remote_actor_url = $2", + ) + .bind(&uid) + .bind(actor_url) + .execute(&self.pool) + .await?; Ok(()) } async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result> { let uid = local_user_id.to_string(); let rows = sqlx::query("SELECT remote_actor_url FROM blocked_actors WHERE local_user_id = $1 ORDER BY blocked_at DESC") .bind(&uid).fetch_all(&self.pool).await?; - Ok(rows.iter().map(|r| r.get::("remote_actor_url")).collect()) + Ok(rows + .iter() + .map(|r| r.get::("remote_actor_url")) + .collect()) } async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result { let uid = local_user_id.to_string(); @@ -440,13 +595,20 @@ impl BlocklistRepository for PostgresFederationRepository { impl ActivityRepository for PostgresFederationRepository { async fn is_activity_processed(&self, activity_id: &str) -> Result { let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM ap_activities WHERE id = $1") - .bind(activity_id).fetch_one(&self.pool).await?; + .bind(activity_id) + .fetch_one(&self.pool) + .await?; Ok(count > 0) } async fn mark_activity_processed(&self, activity_id: &str) -> Result<()> { let ts = datetime_to_str(&Utc::now().naive_utc()); - sqlx::query("INSERT INTO ap_activities (id, processed_at) VALUES ($1, $2) ON CONFLICT DO NOTHING") - .bind(activity_id).bind(&ts).execute(&self.pool).await?; + sqlx::query( + "INSERT INTO ap_activities (id, processed_at) VALUES ($1, $2) ON CONFLICT DO NOTHING", + ) + .bind(activity_id) + .bind(&ts) + .execute(&self.pool) + .await?; Ok(()) } } diff --git a/crates/adapters/postgres/src/ap_content.rs b/crates/adapters/postgres/src/ap_content.rs index aae0a2a..f55ef9a 100644 --- a/crates/adapters/postgres/src/ap_content.rs +++ b/crates/adapters/postgres/src/ap_content.rs @@ -90,12 +90,24 @@ impl LocalApContentQuery for PostgresApContentQuery { )?, }; let movie = MovieRow { - id: row.try_get("m_id").map_err(|e| DomainError::InfrastructureError(e.to_string()))?, - external_metadata_id: row.try_get("external_metadata_id").map_err(|e| DomainError::InfrastructureError(e.to_string()))?, - title: row.try_get("title").map_err(|e| DomainError::InfrastructureError(e.to_string()))?, - release_year: row.try_get("release_year").map_err(|e| DomainError::InfrastructureError(e.to_string()))?, - director: row.try_get("director").map_err(|e| DomainError::InfrastructureError(e.to_string()))?, - poster_path: row.try_get("poster_path").map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + id: row + .try_get("m_id") + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + external_metadata_id: row + .try_get("external_metadata_id") + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + title: row + .try_get("title") + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + release_year: row + .try_get("release_year") + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + director: row + .try_get("director") + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + poster_path: row + .try_get("poster_path") + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?, } .into_domain()?; Ok(WatchlistWithMovie { entry, movie }) @@ -103,10 +115,7 @@ impl LocalApContentQuery for PostgresApContentQuery { .collect() } - async fn get_review_by_id( - &self, - review_id: &ReviewId, - ) -> Result, DomainError> { + async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError> { let id = review_id.value().to_string(); sqlx::query_as::<_, ReviewRow>( "SELECT id, movie_id, user_id, rating, comment, diff --git a/crates/adapters/sqlite-federation/src/lib.rs b/crates/adapters/sqlite-federation/src/lib.rs index 77717ce..ccdd9b7 100644 --- a/crates/adapters/sqlite-federation/src/lib.rs +++ b/crates/adapters/sqlite-federation/src/lib.rs @@ -4,12 +4,12 @@ use chrono::{NaiveDateTime, Utc}; use sqlx::{Row, SqlitePool}; use activitypub::RemoteReviewRepository; +use domain::models::{RemoteWatchlistEntry, Review, ReviewSource}; +use domain::ports::RemoteWatchlistRepository; use k_ap::{ ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, FollowRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, }; -use domain::models::{RemoteWatchlistEntry, Review, ReviewSource}; -use domain::ports::RemoteWatchlistRepository; fn datetime_to_str(dt: &NaiveDateTime) -> String { dt.format("%Y-%m-%d %H:%M:%S").to_string() @@ -246,7 +246,10 @@ impl FollowRepository for SqliteFederationRepository { .fetch_all(&self.pool) .await?; - Ok(rows.iter().map(|row| remote_actor_from_row(row, "remote_actor_url")).collect()) + Ok(rows + .iter() + .map(|row| remote_actor_from_row(row, "remote_actor_url")) + .collect()) } async fn get_accepted_follower_inboxes( @@ -268,7 +271,10 @@ impl FollowRepository for SqliteFederationRepository { .fetch_all(&self.pool) .await?; - Ok(rows.iter().filter_map(|r| r.try_get::("inbox").ok()).collect()) + Ok(rows + .iter() + .filter_map(|r| r.try_get::("inbox").ok()) + .collect()) } async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> Result { @@ -308,7 +314,10 @@ impl FollowRepository for SqliteFederationRepository { .fetch_all(&self.pool) .await?; - Ok(rows.iter().map(|row| remote_actor_from_row(row, "remote_actor_url")).collect()) + Ok(rows + .iter() + .map(|row| remote_actor_from_row(row, "remote_actor_url")) + .collect()) } async fn add_following( @@ -377,7 +386,10 @@ impl FollowRepository for SqliteFederationRepository { .fetch_all(&self.pool) .await?; - Ok(rows.iter().map(|row| remote_actor_from_row(row, "url")).collect()) + Ok(rows + .iter() + .map(|row| remote_actor_from_row(row, "url")) + .collect()) } async fn count_following(&self, local_user_id: uuid::Uuid) -> Result { @@ -416,7 +428,10 @@ impl FollowRepository for SqliteFederationRepository { .fetch_all(&self.pool) .await?; - Ok(rows.iter().map(|row| remote_actor_from_row(row, "url")).collect()) + Ok(rows + .iter() + .map(|row| remote_actor_from_row(row, "url")) + .collect()) } async fn update_following_status( @@ -748,23 +763,20 @@ impl BlocklistRepository for SqliteFederationRepository { #[async_trait] impl ActivityRepository for SqliteFederationRepository { async fn is_activity_processed(&self, activity_id: &str) -> Result { - let count: i64 = - sqlx::query_scalar("SELECT COUNT(*) FROM ap_activities WHERE id = ?1") - .bind(activity_id) - .fetch_one(&self.pool) - .await?; + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM ap_activities WHERE id = ?1") + .bind(activity_id) + .fetch_one(&self.pool) + .await?; Ok(count > 0) } async fn mark_activity_processed(&self, activity_id: &str) -> Result<()> { let ts = datetime_to_str(&Utc::now().naive_utc()); - sqlx::query( - "INSERT OR IGNORE INTO ap_activities (id, processed_at) VALUES (?1, ?2)", - ) - .bind(activity_id) - .bind(&ts) - .execute(&self.pool) - .await?; + sqlx::query("INSERT OR IGNORE INTO ap_activities (id, processed_at) VALUES (?1, ?2)") + .bind(activity_id) + .bind(&ts) + .execute(&self.pool) + .await?; Ok(()) } } diff --git a/crates/adapters/sqlite/src/ap_content.rs b/crates/adapters/sqlite/src/ap_content.rs index b8c6948..5b507e7 100644 --- a/crates/adapters/sqlite/src/ap_content.rs +++ b/crates/adapters/sqlite/src/ap_content.rs @@ -67,10 +67,7 @@ impl LocalApContentQuery for SqliteApContentQuery { rows.into_iter().map(WatchlistRow::into_domain).collect() } - async fn get_review_by_id( - &self, - review_id: &ReviewId, - ) -> Result, DomainError> { + async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError> { let id = review_id.value().to_string(); sqlx::query_as::<_, ReviewRow>( "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url diff --git a/crates/adapters/sqlite/src/users.rs b/crates/adapters/sqlite/src/users.rs index fdac5e6..c445d75 100644 --- a/crates/adapters/sqlite/src/users.rs +++ b/crates/adapters/sqlite/src/users.rs @@ -31,7 +31,10 @@ impl SqliteUserRepository { } } - fn row_to_user(row: &sqlx::sqlite::SqliteRow, profile_fields: Vec) -> Result { + fn row_to_user( + row: &sqlx::sqlite::SqliteRow, + profile_fields: Vec, + ) -> Result { let id_str: String = row.try_get("id").unwrap_or_default(); let id = uuid::Uuid::parse_str(&id_str) .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; diff --git a/crates/application/src/test_helpers.rs b/crates/application/src/test_helpers.rs index 85ff15d..c3befc6 100644 --- a/crates/application/src/test_helpers.rs +++ b/crates/application/src/test_helpers.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +#[cfg(feature = "federation")] +use domain::testing::PanicRemoteWatchlistRepository; use domain::{ ports::{ AuthService, DiaryExporter, DiaryRepository, DocumentParser, EventPublisher, ImageStorage, @@ -18,13 +20,8 @@ use domain::{ PanicSearchPort, PanicStatsRepository, }, }; -#[cfg(feature = "federation")] -use domain::testing::PanicRemoteWatchlistRepository; -use crate::{ - config::AppConfig, - context::AppContext, -}; +use crate::{config::AppConfig, context::AppContext}; pub struct TestContextBuilder { pub movie_repo: Arc, diff --git a/crates/application/src/use_cases/add_to_watchlist.rs b/crates/application/src/use_cases/add_to_watchlist.rs index 1b12360..f3b6fb2 100644 --- a/crates/application/src/use_cases/add_to_watchlist.rs +++ b/crates/application/src/use_cases/add_to_watchlist.rs @@ -68,8 +68,8 @@ mod tests { use domain::{ models::Movie, ports::MovieRepository, - value_objects::{MovieTitle, ReleaseYear}, testing::{InMemoryMovieRepository, InMemoryWatchlistRepository}, + value_objects::{MovieTitle, ReleaseYear}, }; use crate::{ diff --git a/crates/application/src/use_cases/delete_review.rs b/crates/application/src/use_cases/delete_review.rs index afffd69..13cda8f 100644 --- a/crates/application/src/use_cases/delete_review.rs +++ b/crates/application/src/use_cases/delete_review.rs @@ -62,17 +62,15 @@ mod tests { use domain::{ models::{Movie, Review}, ports::{MovieRepository, ReviewRepository}, - value_objects::{MovieId, MovieTitle, Rating, ReleaseYear, UserId}, testing::{ FakeDiaryRepository, InMemoryMovieRepository, InMemoryReviewRepository, NoopEventPublisher, }, + value_objects::{MovieId, MovieTitle, Rating, ReleaseYear, UserId}, }; use crate::{ - commands::DeleteReviewCommand, - test_helpers::TestContextBuilder, - use_cases::delete_review, + commands::DeleteReviewCommand, test_helpers::TestContextBuilder, use_cases::delete_review, }; fn make_movie() -> Movie { @@ -86,8 +84,14 @@ mod tests { } fn make_review(movie_id: MovieId, user_id: UserId) -> Review { - Review::new(movie_id, user_id, Rating::new(4).unwrap(), None, Utc::now().naive_utc()) - .unwrap() + Review::new( + movie_id, + user_id, + Rating::new(4).unwrap(), + None, + Utc::now().naive_utc(), + ) + .unwrap() } #[tokio::test] diff --git a/crates/application/src/use_cases/register.rs b/crates/application/src/use_cases/register.rs index e208796..c2e4a4d 100644 --- a/crates/application/src/use_cases/register.rs +++ b/crates/application/src/use_cases/register.rs @@ -54,11 +54,7 @@ mod tests { use domain::testing::InMemoryUserRepository; use domain::value_objects::Email; - use crate::{ - commands::RegisterCommand, - test_helpers::TestContextBuilder, - use_cases::register, - }; + use crate::{commands::RegisterCommand, test_helpers::TestContextBuilder, use_cases::register}; fn cmd(email: &str) -> RegisterCommand { RegisterCommand { @@ -76,7 +72,9 @@ mod tests { .with_users(Arc::clone(&users) as _) .build(); - register::execute(&ctx, cmd("alice@example.com")).await.unwrap(); + register::execute(&ctx, cmd("alice@example.com")) + .await + .unwrap(); let email = Email::new("alice@example.com".into()).unwrap(); let user = users.find_by_email(&email).await.unwrap().unwrap(); @@ -91,7 +89,9 @@ mod tests { .with_users(Arc::clone(&users) as _) .build(); - register::execute(&ctx, cmd("bob@example.com")).await.unwrap(); + register::execute(&ctx, cmd("bob@example.com")) + .await + .unwrap(); let result = register::execute(&ctx, cmd("bob@example.com")).await; assert!(result.is_err(), "duplicate email should fail"); } diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index 39370b3..cf02cae 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -391,10 +391,7 @@ pub trait LocalApContentQuery: Send + Sync { user_id: &UserId, ) -> Result, DomainError>; - async fn get_review_by_id( - &self, - review_id: &ReviewId, - ) -> Result, DomainError>; + async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError>; async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError>; diff --git a/crates/domain/src/testing.rs b/crates/domain/src/testing.rs index c2ef607..743e0b7 100644 --- a/crates/domain/src/testing.rs +++ b/crates/domain/src/testing.rs @@ -11,16 +11,16 @@ use crate::{ errors::DomainError, events::DomainEvent, models::{ - DiaryEntry, DiaryFilter, ExportFormat, FeedEntry, FieldMapping, FileFormat, ImportError, - ImportProfile, ImportSession, IndexableDocument, Movie, MovieFilter, MovieProfile, - MovieStats, MovieSummary, ParsedFile, Person, PersonCredits, PersonId, ExternalPersonId, - Review, ReviewHistory, SearchQuery, SearchResults, User, UserStats, UserSummary, - UserTrends, WatchlistEntry, WatchlistWithMovie, AnnotatedRow, EntityType, + AnnotatedRow, DiaryEntry, DiaryFilter, EntityType, ExportFormat, ExternalPersonId, + FeedEntry, FieldMapping, FileFormat, ImportError, ImportProfile, ImportSession, + IndexableDocument, Movie, MovieFilter, MovieProfile, MovieStats, MovieSummary, ParsedFile, + Person, PersonCredits, PersonId, Review, ReviewHistory, SearchQuery, SearchResults, User, + UserStats, UserSummary, UserTrends, WatchlistEntry, WatchlistWithMovie, collections::{PageParams, Paginated}, }, ports::{ - AuthService, DiaryExporter, DiaryRepository, DocumentParser, EventPublisher, - FeedSortBy, FollowingFilter, GeneratedToken, ImageStorage, ImportProfileRepository, + AuthService, DiaryExporter, DiaryRepository, DocumentParser, EventPublisher, FeedSortBy, + FollowingFilter, GeneratedToken, ImageStorage, ImportProfileRepository, ImportSessionRepository, MetadataClient, MetadataSearchCriteria, MovieProfileRepository, MovieRepository, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient, ReviewRepository, SearchCommand, SearchPort, StatsRepository, UserProfileFieldsRepository, @@ -40,7 +40,9 @@ pub struct InMemoryMovieRepository { impl InMemoryMovieRepository { pub fn new() -> Arc { - Arc::new(Self { store: Mutex::new(HashMap::new()) }) + Arc::new(Self { + store: Mutex::new(HashMap::new()), + }) } pub fn count(&self) -> usize { @@ -55,11 +57,14 @@ impl MovieRepository for InMemoryMovieRepository { external_metadata_id: &ExternalMetadataId, ) -> Result, DomainError> { let store = self.store.lock().unwrap(); - Ok(store.values().find(|m| { - m.external_metadata_id() - .map(|e| e.value() == external_metadata_id.value()) - .unwrap_or(false) - }).cloned()) + Ok(store + .values() + .find(|m| { + m.external_metadata_id() + .map(|e| e.value() == external_metadata_id.value()) + .unwrap_or(false) + }) + .cloned()) } async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError> { @@ -72,11 +77,18 @@ impl MovieRepository for InMemoryMovieRepository { year: &ReleaseYear, ) -> Result, DomainError> { let store = self.store.lock().unwrap(); - Ok(store.values().filter(|m| m.title() == title && m.release_year() == year).cloned().collect()) + Ok(store + .values() + .filter(|m| m.title() == title && m.release_year() == year) + .cloned() + .collect()) } async fn upsert_movie(&self, movie: &Movie) -> Result<(), DomainError> { - self.store.lock().unwrap().insert(movie.id().value(), movie.clone()); + self.store + .lock() + .unwrap() + .insert(movie.id().value(), movie.clone()); Ok(()) } @@ -90,7 +102,12 @@ impl MovieRepository for InMemoryMovieRepository { _page: &crate::models::collections::PageParams, _filter: &MovieFilter, ) -> Result, DomainError> { - Ok(Paginated { items: vec![], total_count: 0, limit: 10, offset: 0 }) + Ok(Paginated { + items: vec![], + total_count: 0, + limit: 10, + offset: 0, + }) } } @@ -102,7 +119,9 @@ pub struct InMemoryReviewRepository { impl InMemoryReviewRepository { pub fn new() -> Arc { - Arc::new(Self { store: Mutex::new(HashMap::new()) }) + Arc::new(Self { + store: Mutex::new(HashMap::new()), + }) } pub fn count(&self) -> usize { @@ -113,7 +132,10 @@ impl InMemoryReviewRepository { #[async_trait] impl ReviewRepository for InMemoryReviewRepository { async fn save_review(&self, review: &Review) -> Result { - self.store.lock().unwrap().insert(review.id().value(), review.clone()); + self.store + .lock() + .unwrap() + .insert(review.id().value(), review.clone()); Ok(DomainEvent::ReviewLogged { review_id: review.id().clone(), movie_id: review.movie_id().clone(), @@ -134,7 +156,11 @@ impl ReviewRepository for InMemoryReviewRepository { async fn get_all_reviews_for_user(&self, user_id: &UserId) -> Result, DomainError> { let store = self.store.lock().unwrap(); - Ok(store.values().filter(|r| r.user_id() == user_id).cloned().collect()) + Ok(store + .values() + .filter(|r| r.user_id() == user_id) + .cloned() + .collect()) } } @@ -146,7 +172,9 @@ pub struct InMemoryUserRepository { impl InMemoryUserRepository { pub fn new() -> Arc { - Arc::new(Self { store: Mutex::new(HashMap::new()) }) + Arc::new(Self { + store: Mutex::new(HashMap::new()), + }) } pub fn count(&self) -> usize { @@ -158,16 +186,25 @@ impl InMemoryUserRepository { impl UserRepository for InMemoryUserRepository { async fn find_by_email(&self, email: &Email) -> Result, DomainError> { let store = self.store.lock().unwrap(); - Ok(store.values().find(|u| u.email().value() == email.value()).cloned()) + Ok(store + .values() + .find(|u| u.email().value() == email.value()) + .cloned()) } async fn find_by_username(&self, username: &Username) -> Result, DomainError> { let store = self.store.lock().unwrap(); - Ok(store.values().find(|u| u.username().value() == username.value()).cloned()) + Ok(store + .values() + .find(|u| u.username().value() == username.value()) + .cloned()) } async fn save(&self, user: &User) -> Result<(), DomainError> { - self.store.lock().unwrap().insert(user.id().value(), user.clone()); + self.store + .lock() + .unwrap() + .insert(user.id().value(), user.clone()); Ok(()) } @@ -199,7 +236,9 @@ pub struct InMemoryWatchlistRepository { impl InMemoryWatchlistRepository { pub fn new() -> Arc { - Arc::new(Self { store: Mutex::new(HashMap::new()) }) + Arc::new(Self { + store: Mutex::new(HashMap::new()), + }) } pub fn count(&self) -> usize { @@ -211,13 +250,20 @@ impl InMemoryWatchlistRepository { impl WatchlistRepository for InMemoryWatchlistRepository { async fn add(&self, entry: &WatchlistEntry) -> Result<(), DomainError> { let key = (entry.user_id.value(), entry.movie_id.value()); - self.store.lock().unwrap().entry(key).or_insert_with(|| entry.clone()); + self.store + .lock() + .unwrap() + .entry(key) + .or_insert_with(|| entry.clone()); Ok(()) } async fn remove(&self, user_id: &UserId, movie_id: &MovieId) -> Result<(), DomainError> { let key = (user_id.value(), movie_id.value()); - self.store.lock().unwrap().remove(&key) + self.store + .lock() + .unwrap() + .remove(&key) .ok_or_else(|| DomainError::NotFound("watchlist entry".into()))?; Ok(()) } @@ -236,7 +282,12 @@ impl WatchlistRepository for InMemoryWatchlistRepository { _user_id: &UserId, _page: &PageParams, ) -> Result, DomainError> { - Ok(Paginated { items: vec![], total_count: 0, limit: 10, offset: 0 }) + Ok(Paginated { + items: vec![], + total_count: 0, + limit: 10, + offset: 0, + }) } async fn contains(&self, user_id: &UserId, movie_id: &MovieId) -> Result { @@ -253,7 +304,9 @@ pub struct NoopEventPublisher { impl NoopEventPublisher { pub fn new() -> Arc { - Arc::new(Self { events: Mutex::new(vec![]) }) + Arc::new(Self { + events: Mutex::new(vec![]), + }) } pub fn published(&self) -> Vec { @@ -333,7 +386,9 @@ impl MetadataClient for FakeMetadataClient { &self, _criteria: &MetadataSearchCriteria, ) -> Result { - Err(DomainError::InfrastructureError("fake metadata client".into())) + Err(DomainError::InfrastructureError( + "fake metadata client".into(), + )) } async fn get_poster_url( @@ -352,17 +407,25 @@ pub struct FakeDiaryRepository { impl FakeDiaryRepository { pub fn new() -> Arc { - Arc::new(Self { histories: Mutex::new(HashMap::new()) }) + Arc::new(Self { + histories: Mutex::new(HashMap::new()), + }) } pub fn seed_history(&self, movie: Movie, reviews: Vec) { - self.histories.lock().unwrap().insert(movie.id().value(), (movie, reviews)); + self.histories + .lock() + .unwrap() + .insert(movie.id().value(), (movie, reviews)); } } #[async_trait] impl DiaryRepository for FakeDiaryRepository { - async fn query_diary(&self, _filter: &DiaryFilter) -> Result, DomainError> { + async fn query_diary( + &self, + _filter: &DiaryFilter, + ) -> Result, DomainError> { unimplemented!("FakeDiaryRepository::query_diary") } @@ -418,7 +481,10 @@ pub struct PanicDiaryRepository; #[async_trait] impl DiaryRepository for PanicDiaryRepository { - async fn query_diary(&self, _filter: &DiaryFilter) -> Result, DomainError> { + async fn query_diary( + &self, + _filter: &DiaryFilter, + ) -> Result, DomainError> { panic!("PanicDiaryRepository called") } @@ -605,8 +671,18 @@ pub struct PanicSearchPort; impl SearchPort for PanicSearchPort { async fn search(&self, _query: &SearchQuery) -> Result { Ok(SearchResults { - movies: Paginated { items: vec![], total_count: 0, limit: 10, offset: 0 }, - people: Paginated { items: vec![], total_count: 0, limit: 10, offset: 0 }, + movies: Paginated { + items: vec![], + total_count: 0, + limit: 10, + offset: 0, + }, + people: Paginated { + items: vec![], + total_count: 0, + limit: 10, + offset: 0, + }, }) } } @@ -678,13 +754,19 @@ impl crate::ports::RemoteWatchlistRepository for PanicRemoteWatchlistRepository async fn remove_by_ap_id(&self, _: &str, _: &str) -> Result<(), DomainError> { panic!("PanicRemoteWatchlistRepository called") } - async fn get_by_actor_url(&self, _: &str) -> Result, DomainError> { + async fn get_by_actor_url( + &self, + _: &str, + ) -> Result, DomainError> { panic!("PanicRemoteWatchlistRepository called") } async fn remove_all_by_actor(&self, _: &str) -> Result<(), DomainError> { panic!("PanicRemoteWatchlistRepository called") } - async fn get_by_derived_uuid(&self, _: uuid::Uuid) -> Result, DomainError> { + async fn get_by_derived_uuid( + &self, + _: uuid::Uuid, + ) -> Result, DomainError> { panic!("PanicRemoteWatchlistRepository called") } } diff --git a/crates/presentation/src/factory.rs b/crates/presentation/src/factory.rs index 65e7d4c..b31703d 100644 --- a/crates/presentation/src/factory.rs +++ b/crates/presentation/src/factory.rs @@ -3,11 +3,10 @@ use std::sync::Arc; use anyhow::Context; use domain::ports::{ - AuthService, DiaryRepository, ImageStorage, ImportProfileRepository, - ImportSessionRepository, LocalApContentQuery, MetadataClient, MovieProfileRepository, - MovieRepository, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient, - ReviewRepository, SearchCommand, SearchPort, StatsRepository, UserProfileFieldsRepository, - UserRepository, WatchlistRepository, + AuthService, DiaryRepository, ImageStorage, ImportProfileRepository, ImportSessionRepository, + LocalApContentQuery, MetadataClient, MovieProfileRepository, MovieRepository, PasswordHasher, + PersonCommand, PersonQuery, PosterFetcherClient, ReviewRepository, SearchCommand, SearchPort, + StatsRepository, UserProfileFieldsRepository, UserRepository, WatchlistRepository, }; pub struct DatabaseAdapters { @@ -36,10 +35,7 @@ pub enum DbPool { Postgres(sqlx::PgPool), } -pub async fn build_database_adapters( - backend: &str, - url: &str, -) -> anyhow::Result { +pub async fn build_database_adapters(backend: &str, url: &str) -> anyhow::Result { match backend { #[cfg(feature = "postgres")] "postgres" => { @@ -118,7 +114,9 @@ pub fn build_image_storage() -> anyhow::Result> { image_storage::create() } -pub fn build_profile_fields_repo(pool: &DbPool) -> anyhow::Result> { +pub fn build_profile_fields_repo( + pool: &DbPool, +) -> anyhow::Result> { match pool { #[cfg(feature = "postgres")] DbPool::Postgres(pool) => Ok(postgres::create_profile_fields_repo(pool.clone())), diff --git a/crates/presentation/src/handlers/html.rs b/crates/presentation/src/handlers/html.rs index 65f6b47..f492aef 100644 --- a/crates/presentation/src/handlers/html.rs +++ b/crates/presentation/src/handlers/html.rs @@ -847,9 +847,16 @@ pub async fn get_followers_collection( .unwrap_or(""); if accept.contains("application/activity+json") || accept.contains("application/ld+json") { let page = params.get("page").and_then(|p| p.parse::().ok()); - return match state.ap_service.followers_collection_json(user_id, page).await { + return match state + .ap_service + .followers_collection_json(user_id, page) + .await + { Ok(json) => ( - [(axum::http::header::CONTENT_TYPE, "application/activity+json")], + [( + axum::http::header::CONTENT_TYPE, + "application/activity+json", + )], json, ) .into_response(), @@ -872,9 +879,16 @@ pub async fn get_following_collection( .unwrap_or(""); if accept.contains("application/activity+json") || accept.contains("application/ld+json") { let page = params.get("page").and_then(|p| p.parse::().ok()); - return match state.ap_service.following_collection_json(user_id, page).await { + return match state + .ap_service + .following_collection_json(user_id, page) + .await + { Ok(json) => ( - [(axum::http::header::CONTENT_TYPE, "application/activity+json")], + [( + axum::http::header::CONTENT_TYPE, + "application/activity+json", + )], json, ) .into_response(), diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index 649fe39..f22bf39 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -82,17 +82,24 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { #[cfg(feature = "federation")] let (event_publisher_arc, ap_router, ap_service, social_query, remote_watchlist_repo) = { - let (activity_repo, follow_repo, actor_repo, blocklist_repo, social_query_arc, review_store, remote_watchlist_repo) = - match &db_pool { - #[cfg(feature = "postgres-federation")] - factory::DbPool::Postgres(pool) => postgres_federation::wire(pool.clone()), - #[cfg(feature = "sqlite-federation")] - factory::DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()), - #[cfg(not(feature = "sqlite-federation"))] - _ => anyhow::bail!( - "DATABASE_BACKEND={backend} federation is not supported by this build" - ), - }; + let ( + activity_repo, + follow_repo, + actor_repo, + blocklist_repo, + social_query_arc, + review_store, + remote_watchlist_repo, + ) = match &db_pool { + #[cfg(feature = "postgres-federation")] + factory::DbPool::Postgres(pool) => postgres_federation::wire(pool.clone()), + #[cfg(feature = "sqlite-federation")] + factory::DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()), + #[cfg(not(feature = "sqlite-federation"))] + _ => anyhow::bail!( + "DATABASE_BACKEND={backend} federation is not supported by this build" + ), + }; let ep: Arc = match event_bus { EventBusBackend::Db => { diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 1fd7782..b905a70 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -47,12 +47,7 @@ async fn main() -> anyhow::Result<()> { // Clone refs federation handler needs before ctx consumes them. #[cfg(feature = "federation")] - let ( - fed_ap_content, - fed_user_repo, - base_url, - allow_registration, - ) = ( + let (fed_ap_content, fed_user_repo, base_url, allow_registration) = ( Arc::clone(&repos.ap_content), Arc::clone(&repos.user), app_config.base_url.clone(), @@ -60,13 +55,20 @@ async fn main() -> anyhow::Result<()> { ); // Wire federation repos early to get remote_watchlist_repo for AppContext. #[cfg(feature = "federation")] - let (fed_activity_repo, fed_follow_repo, fed_actor_repo, fed_blocklist_repo, _fed_social_query, fed_review_store, fed_remote_watchlist_repo) = - match &db_pool { - #[cfg(feature = "sqlite-federation")] - db::DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()), - #[cfg(feature = "postgres-federation")] - db::DbPool::Postgres(pool) => postgres_federation::wire(pool.clone()), - }; + let ( + fed_activity_repo, + fed_follow_repo, + fed_actor_repo, + fed_blocklist_repo, + _fed_social_query, + fed_review_store, + fed_remote_watchlist_repo, + ) = match &db_pool { + #[cfg(feature = "sqlite-federation")] + db::DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()), + #[cfg(feature = "postgres-federation")] + db::DbPool::Postgres(pool) => postgres_federation::wire(pool.clone()), + }; let ctx = AppContext { movie_repository: repos.movie,