diff --git a/Cargo.lock b/Cargo.lock index 4c24075..13fa2aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2770,8 +2770,8 @@ dependencies = [ [[package]] name = "k-ap" -version = "0.1.0" -source = "git+https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git?tag=v0.1.3#7901b29f7c09415e82f7f098f89c1df6b86bbfd3" +version = "0.1.10" +source = "git+https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git?tag=v0.1.10#d80cfd0431205498161db8665fd884710866ca95" dependencies = [ "activitypub_federation", "anyhow", diff --git a/crates/adapters/activitypub/Cargo.toml b/crates/adapters/activitypub/Cargo.toml index c392b9d..180eb9f 100644 --- a/crates/adapters/activitypub/Cargo.toml +++ b/crates/adapters/activitypub/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] -k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.3" } +k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" } domain = { workspace = true } axum = { workspace = true } serde = { workspace = true } diff --git a/crates/adapters/activitypub/src/composite_handler.rs b/crates/adapters/activitypub/src/composite_handler.rs index ba0ba69..0399ace 100644 --- a/crates/adapters/activitypub/src/composite_handler.rs +++ b/crates/adapters/activitypub/src/composite_handler.rs @@ -18,7 +18,9 @@ impl ApObjectHandler for CompositeObjectHandler { &self, user_id: uuid::Uuid, ) -> anyhow::Result> { - self.review.get_local_objects_for_user(user_id).await + let mut results = self.review.get_local_objects_for_user(user_id).await?; + results.extend(self.watchlist.get_local_objects_for_user(user_id).await?); + Ok(results) } async fn get_local_objects_page( diff --git a/crates/adapters/activitypub/src/event_handler.rs b/crates/adapters/activitypub/src/event_handler.rs index 539053c..60059a0 100644 --- a/crates/adapters/activitypub/src/event_handler.rs +++ b/crates/adapters/activitypub/src/event_handler.rs @@ -3,7 +3,7 @@ use domain::ports::EventHandler; use domain::{ errors::DomainError, events::DomainEvent, - ports::{MovieRepository, ReviewRepository}, + ports::LocalApContentQuery, value_objects::{ReviewId, UserId}, }; use std::sync::Arc; @@ -15,22 +15,19 @@ use crate::urls::{actor_url, review_url}; pub struct ActivityPubEventHandler { ap_service: Arc, - movie_repository: Arc, - review_repository: Arc, + content_query: Arc, base_url: String, } impl ActivityPubEventHandler { pub fn new( ap_service: Arc, - movie_repository: Arc, - review_repository: Arc, + content_query: Arc, base_url: String, ) -> Self { Self { ap_service, - movie_repository, - review_repository, + content_query, base_url, } } @@ -90,7 +87,7 @@ impl EventHandler for ActivityPubEventHandler { impl ActivityPubEventHandler { async fn on_review_logged(&self, user_id: &UserId, review_id: &ReviewId) -> anyhow::Result<()> { - let review = match self.review_repository.get_review_by_id(review_id).await? { + let review = match self.content_query.get_review_by_id(review_id).await? { Some(r) => r, None => return Ok(()), }; @@ -99,7 +96,7 @@ impl ActivityPubEventHandler { let actor = actor_url(&self.base_url, user_id.value()); let movie = self - .movie_repository + .content_query .get_movie_by_id(review.movie_id()) .await .ok() @@ -108,10 +105,7 @@ impl ActivityPubEventHandler { .as_ref() .map(|m| m.title().value().to_string()) .unwrap_or_else(|| "Unknown".to_string()); - let release_year = movie - .as_ref() - .map(|m| m.release_year().value()) - .unwrap_or(0); + let release_year = movie.as_ref().map(|m| m.release_year().value()).unwrap_or(0); let poster_url = movie .as_ref() .and_then(|m| m.poster_path()) @@ -140,7 +134,7 @@ impl ActivityPubEventHandler { user_id: &UserId, review_id: &ReviewId, ) -> anyhow::Result<()> { - let review = match self.review_repository.get_review_by_id(review_id).await? { + let review = match self.content_query.get_review_by_id(review_id).await? { Some(r) => r, None => return Ok(()), }; @@ -149,7 +143,7 @@ impl ActivityPubEventHandler { let actor = actor_url(&self.base_url, user_id.value()); let movie = self - .movie_repository + .content_query .get_movie_by_id(review.movie_id()) .await .ok() @@ -158,10 +152,7 @@ impl ActivityPubEventHandler { .as_ref() .map(|m| m.title().value().to_string()) .unwrap_or_else(|| "Unknown".to_string()); - let release_year = movie - .as_ref() - .map(|m| m.release_year().value()) - .unwrap_or(0); + let release_year = movie.as_ref().map(|m| m.release_year().value()).unwrap_or(0); let poster_url = movie .as_ref() .and_then(|m| m.poster_path()) @@ -211,7 +202,7 @@ impl ActivityPubEventHandler { let actor = actor_url(&self.base_url, user_id.value()); let poster_url = self - .movie_repository + .content_query .get_movie_by_id(movie_id) .await .ok() diff --git a/crates/adapters/activitypub/src/lib.rs b/crates/adapters/activitypub/src/lib.rs index a3dc8e1..9dd9168 100644 --- a/crates/adapters/activitypub/src/lib.rs +++ b/crates/adapters/activitypub/src/lib.rs @@ -30,22 +30,21 @@ pub async fn wire( federation_repo: std::sync::Arc, review_store: std::sync::Arc, remote_watchlist_repo: std::sync::Arc, + local_ap_content: std::sync::Arc, user_repo: std::sync::Arc, - movie_repo: std::sync::Arc, - review_repo: std::sync::Arc, - diary_repo: std::sync::Arc, base_url: String, allow_registration: bool, _event_publisher: std::sync::Arc, ) -> anyhow::Result { let review_handler = std::sync::Arc::new(ReviewObjectHandler { - movie_repository: std::sync::Arc::clone(&movie_repo), - diary_repository: std::sync::Arc::clone(&diary_repo), + content_query: std::sync::Arc::clone(&local_ap_content), review_store, base_url: base_url.clone(), }); let watchlist_handler = std::sync::Arc::new(watchlist_handler::WatchlistObjectHandler { remote_watchlist_repo, + content_query: std::sync::Arc::clone(&local_ap_content), + base_url: base_url.clone(), }); let composite = std::sync::Arc::new(composite_handler::CompositeObjectHandler { review: review_handler, @@ -80,8 +79,7 @@ pub async fn wire( let router = concrete.router(); let event_handler = std::sync::Arc::new(ActivityPubEventHandler::new( std::sync::Arc::clone(&concrete), - movie_repo, - review_repo, + local_ap_content, base_url, )) as std::sync::Arc; diff --git a/crates/adapters/activitypub/src/review_handler.rs b/crates/adapters/activitypub/src/review_handler.rs index 163c56a..90cbd07 100644 --- a/crates/adapters/activitypub/src/review_handler.rs +++ b/crates/adapters/activitypub/src/review_handler.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use k_ap::ApObjectHandler; use async_trait::async_trait; use domain::{ - models::{Review, ReviewSource}, - ports::{DiaryRepository, MovieRepository}, + models::ReviewSource, + ports::LocalApContentQuery, value_objects::{Comment, MovieId, Rating, ReviewId, UserId}, }; use url::Url; @@ -14,8 +14,7 @@ use crate::remote_review_repository::RemoteReviewRepository; use crate::urls::{actor_url, review_url}; pub struct ReviewObjectHandler { - pub movie_repository: Arc, - pub diary_repository: Arc, + pub content_query: Arc, pub review_store: Arc, pub base_url: String, } @@ -27,51 +26,33 @@ impl ApObjectHandler for ReviewObjectHandler { user_id: uuid::Uuid, ) -> anyhow::Result> { let domain_user_id = UserId::from_uuid(user_id); - let history = self - .diary_repository - .get_user_history(&domain_user_id) - .await?; + let entries = self + .content_query + .get_local_reviews_for_user(&domain_user_id) + .await + .map_err(|e| anyhow::anyhow!(e.to_string()))?; + let actor = actor_url(&self.base_url, user_id); let mut results = Vec::new(); - for entry in history { + for entry in entries { let review = entry.review(); - if !matches!(review.source(), ReviewSource::Local) { - continue; - } + let movie = entry.movie(); let ap_id = review_url(&self.base_url, review.id()); - let actor_url = actor_url(&self.base_url, user_id); - - let movie = self - .movie_repository - .get_movie_by_id(review.movie_id()) - .await - .ok() - .flatten(); - let movie_title = movie - .as_ref() - .map(|m| m.title().value().to_string()) - .unwrap_or_else(|| "Unknown".to_string()); - let release_year = movie - .as_ref() - .map(|m| m.release_year().value()) - .unwrap_or(0); let poster_url = movie - .as_ref() - .and_then(|m| m.poster_path()) + .poster_path() .map(|p| format!("{}/images/{}", self.base_url, p.value())); let obj = review_to_ap_object( review, ap_id.clone(), - actor_url, - movie_title, - release_year, + actor.clone(), + movie.title().value().to_string(), + movie.release_year().value(), poster_url, &self.base_url, ); - let json = serde_json::to_value(obj)?; - results.push((ap_id, json)); + results.push((ap_id, serde_json::to_value(obj)?)); } Ok(results) } @@ -82,23 +63,18 @@ impl ApObjectHandler for ReviewObjectHandler { before: Option>, limit: usize, ) -> anyhow::Result)>> { - use domain::value_objects::UserId; - let domain_user_id = UserId::from_uuid(user_id); - let history = self - .diary_repository - .get_user_history(&domain_user_id) - .await?; + let entries = self + .content_query + .get_local_reviews_for_user(&domain_user_id) + .await + .map_err(|e| anyhow::anyhow!(e.to_string()))?; + let actor = actor_url(&self.base_url, user_id); let mut results = Vec::new(); - for entry in history { + for entry in entries { let review = entry.review(); - if !matches!(review.source(), ReviewSource::Local) { - continue; - } - - let published = - chrono::DateTime::from_naive_utc_and_offset(*review.watched_at(), chrono::Utc); + let published = chrono::DateTime::from_naive_utc_and_offset(*review.watched_at(), chrono::Utc); if let Some(cutoff) = before && published >= cutoff @@ -106,39 +82,22 @@ impl ApObjectHandler for ReviewObjectHandler { continue; } + let movie = entry.movie(); let ap_id = review_url(&self.base_url, review.id()); - let actor_url = actor_url(&self.base_url, user_id); - - let movie = self - .movie_repository - .get_movie_by_id(review.movie_id()) - .await - .ok() - .flatten(); - let movie_title = movie - .as_ref() - .map(|m| m.title().value().to_string()) - .unwrap_or_else(|| "Unknown".to_string()); - let release_year = movie - .as_ref() - .map(|m| m.release_year().value()) - .unwrap_or(0); let poster_url = movie - .as_ref() - .and_then(|m| m.poster_path()) + .poster_path() .map(|p| format!("{}/images/{}", self.base_url, p.value())); let obj = review_to_ap_object( review, ap_id.clone(), - actor_url, - movie_title, - release_year, + actor.clone(), + movie.title().value().to_string(), + movie.release_year().value(), poster_url, &self.base_url, ); - let json = serde_json::to_value(obj)?; - results.push((ap_id, json, published)); + results.push((ap_id, serde_json::to_value(obj)?, published)); if results.len() >= limit { break; @@ -174,7 +133,7 @@ impl ApObjectHandler for ReviewObjectHandler { let rating = Rating::new(obj.rating.min(5))?; let comment = obj.comment.map(Comment::new).transpose()?; - let review = Review::from_persistence( + let review = domain::models::Review::from_persistence( review_id, movie_id, user_id, @@ -242,7 +201,7 @@ impl ApObjectHandler for ReviewObjectHandler { } async fn count_local_posts(&self) -> anyhow::Result { - self.diary_repository + self.content_query .count_local_posts() .await .map_err(|e| anyhow::anyhow!(e.to_string())) diff --git a/crates/adapters/activitypub/src/watchlist_handler.rs b/crates/adapters/activitypub/src/watchlist_handler.rs index ea090ae..9ac843d 100644 --- a/crates/adapters/activitypub/src/watchlist_handler.rs +++ b/crates/adapters/activitypub/src/watchlist_handler.rs @@ -3,22 +3,61 @@ use std::sync::Arc; use k_ap::ApObjectHandler; use async_trait::async_trait; use chrono::Utc; -use domain::{models::RemoteWatchlistEntry, ports::RemoteWatchlistRepository}; +use domain::{ + models::RemoteWatchlistEntry, + ports::{LocalApContentQuery, RemoteWatchlistRepository}, + value_objects::UserId, +}; use url::Url; -use crate::objects::WatchlistObject; +use crate::{objects::{WatchlistObject, watchlist_to_ap_object}, urls::{actor_url, watchlist_entry_url}}; pub struct WatchlistObjectHandler { pub remote_watchlist_repo: Arc, + pub content_query: Arc, + pub base_url: String, } #[async_trait] impl ApObjectHandler for WatchlistObjectHandler { async fn get_local_objects_for_user( &self, - _user_id: uuid::Uuid, + user_id: uuid::Uuid, ) -> anyhow::Result> { - Ok(vec![]) + let domain_user_id = UserId::from_uuid(user_id); + let entries = self + .content_query + .get_local_watchlist_for_user(&domain_user_id) + .await + .map_err(|e| anyhow::anyhow!(e.to_string()))?; + + let actor = actor_url(&self.base_url, user_id); + let mut results = Vec::new(); + for wm in entries { + let movie_id = wm.entry.movie_id.value(); + let ap_id = watchlist_entry_url(&self.base_url, user_id, movie_id); + let added_at = chrono::DateTime::from_naive_utc_and_offset(wm.entry.added_at, Utc); + let external_metadata_id = wm + .movie + .external_metadata_id() + .map(|id| id.value().to_string()); + let poster_url = wm + .movie + .poster_path() + .map(|p| format!("{}/images/{}", self.base_url, p.value())); + let obj = watchlist_to_ap_object( + ap_id.clone(), + actor.clone(), + wm.movie.title().value().to_string(), + wm.movie.release_year().value(), + external_metadata_id, + poster_url, + added_at, + &self.base_url, + ); + results.push((ap_id, serde_json::to_value(obj)?)); + } + Ok(results) } async fn get_local_objects_page( diff --git a/crates/adapters/postgres-federation/Cargo.toml b/crates/adapters/postgres-federation/Cargo.toml index e67191d..f1e8600 100644 --- a/crates/adapters/postgres-federation/Cargo.toml +++ b/crates/adapters/postgres-federation/Cargo.toml @@ -12,7 +12,7 @@ sqlx = { version = "0.8.6", features = [ "chrono", ] } activitypub = { workspace = true } -k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.3" } +k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" } domain = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } diff --git a/crates/adapters/postgres-federation/src/lib.rs b/crates/adapters/postgres-federation/src/lib.rs index 9899370..bd8aa79 100644 --- a/crates/adapters/postgres-federation/src/lib.rs +++ b/crates/adapters/postgres-federation/src/lib.rs @@ -631,6 +631,45 @@ impl FederationRepository for PostgresFederationRepository { .await?; Ok(count > 0) } + + async fn migrate_follower_actor( + &self, + old_actor_url: &str, + new_actor_url: &str, + ) -> Result> { + let candidates: Vec = sqlx::query_scalar( + "SELECT local_user_id FROM ap_following + WHERE remote_actor_url = $1 + AND local_user_id NOT IN ( + SELECT local_user_id FROM ap_following WHERE remote_actor_url = $2 + )", + ) + .bind(old_actor_url) + .bind(new_actor_url) + .fetch_all(&self.pool) + .await?; + + if candidates.is_empty() { + return Ok(vec![]); + } + + sqlx::query( + "UPDATE ap_following SET remote_actor_url = $1 + WHERE remote_actor_url = $2 + AND local_user_id NOT IN ( + SELECT local_user_id FROM ap_following WHERE remote_actor_url = $1 + )", + ) + .bind(new_actor_url) + .bind(old_actor_url) + .execute(&self.pool) + .await?; + + candidates + .into_iter() + .map(|s| uuid::Uuid::parse_str(&s).map_err(|e| anyhow::anyhow!(e))) + .collect() + } } #[async_trait] diff --git a/crates/adapters/postgres/src/ap_content.rs b/crates/adapters/postgres/src/ap_content.rs new file mode 100644 index 0000000..a5fc5ae --- /dev/null +++ b/crates/adapters/postgres/src/ap_content.rs @@ -0,0 +1,148 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::{DiaryEntry, Movie, Review, WatchlistEntry, WatchlistWithMovie}, + ports::LocalApContentQuery, + value_objects::{MovieId, ReviewId, UserId, WatchlistEntryId}, +}; +use sqlx::{PgPool, Row}; + +use crate::models::{DiaryRow, MovieRow, ReviewRow, parse_datetime, parse_uuid}; + +pub struct PostgresApContentQuery { + pool: PgPool, +} + +impl PostgresApContentQuery { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } +} + +#[async_trait] +impl LocalApContentQuery for PostgresApContentQuery { + async fn get_local_reviews_for_user( + &self, + user_id: &UserId, + ) -> Result, DomainError> { + let uid = user_id.value().to_string(); + let rows = sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1 AND r.remote_actor_url IS NULL + ORDER BY r.created_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + rows.into_iter().map(DiaryRow::into_domain).collect() + } + + async fn get_local_watchlist_for_user( + &self, + user_id: &UserId, + ) -> Result, DomainError> { + let uid = user_id.value().to_string(); + let rows = sqlx::query( + "SELECT w.id, w.user_id, w.movie_id, + to_char(w.added_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS added_at, + m.id AS m_id, m.external_metadata_id, m.title, m.release_year, + m.director, m.poster_path + FROM watchlist_entries w + JOIN movies m ON m.id = w.movie_id + WHERE w.user_id = $1 + ORDER BY w.added_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + + rows.into_iter() + .map(|row| { + let entry = WatchlistEntry { + id: WatchlistEntryId::from_uuid(parse_uuid( + &row.try_get::("id") + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + )?), + user_id: UserId::from_uuid(parse_uuid( + &row.try_get::("user_id") + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + )?), + movie_id: MovieId::from_uuid(parse_uuid( + &row.try_get::("movie_id") + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + )?), + added_at: parse_datetime( + &row.try_get::("added_at") + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + )?, + }; + let movie = MovieRow { + id: row.try_get("m_id").map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + external_metadata_id: row.try_get("external_metadata_id").map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + title: row.try_get("title").map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + release_year: row.try_get("release_year").map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + director: row.try_get("director").map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + poster_path: row.try_get("poster_path").map_err(|e| DomainError::InfrastructureError(e.to_string()))?, + } + .into_domain()?; + Ok(WatchlistWithMovie { entry, movie }) + }) + .collect() + } + + async fn get_review_by_id( + &self, + review_id: &ReviewId, + ) -> Result, DomainError> { + let id = review_id.value().to_string(); + sqlx::query_as::<_, ReviewRow>( + "SELECT id, movie_id, user_id, rating, comment, + to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + remote_actor_url + FROM reviews WHERE id = $1", + ) + .bind(&id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .map(ReviewRow::into_domain) + .transpose() + } + + async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError> { + let id = movie_id.value().to_string(); + sqlx::query_as::<_, MovieRow>( + "SELECT id, external_metadata_id, title, release_year, director, poster_path + FROM movies WHERE id = $1", + ) + .bind(&id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .map(MovieRow::into_domain) + .transpose() + } + + async fn count_local_posts(&self) -> Result { + let count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM reviews WHERE remote_actor_url IS NULL") + .fetch_one(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(count as u64) + } +} diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index e44a9f0..036ea4e 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -12,6 +12,7 @@ use domain::{ }; use sqlx::PgPool; +mod ap_content; mod image_ref; mod import_profile; mod import_session; @@ -27,6 +28,7 @@ use models::{ MovieSummaryRow, ReviewRow, UserTotalsRow, datetime_to_str, }; +pub use ap_content::PostgresApContentQuery; pub use image_ref::{PostgresImageRefAdapter, create_image_ref}; pub use import_profile::PostgresImportProfileRepository; pub use import_session::PostgresImportSessionRepository; @@ -949,6 +951,7 @@ pub async fn wire( std::sync::Arc, std::sync::Arc, std::sync::Arc, + std::sync::Arc, )> { use anyhow::Context; @@ -968,6 +971,7 @@ pub async fn wire( std::sync::Arc::new(PostgresImportProfileRepository::new(pool.clone())); let movie_profile_repo = std::sync::Arc::new(PostgresMovieProfileRepository::new(pool.clone())); let watchlist_repo = std::sync::Arc::new(PostgresWatchlistRepository::new(pool.clone())); + let ap_content = std::sync::Arc::new(PostgresApContentQuery::new(pool.clone())); Ok(( pool.clone(), @@ -980,5 +984,6 @@ pub async fn wire( import_profile_repo as _, movie_profile_repo as _, watchlist_repo as _, + ap_content as _, )) } diff --git a/crates/adapters/sqlite-federation/Cargo.toml b/crates/adapters/sqlite-federation/Cargo.toml index 4f27e8b..12b2af8 100644 --- a/crates/adapters/sqlite-federation/Cargo.toml +++ b/crates/adapters/sqlite-federation/Cargo.toml @@ -6,7 +6,7 @@ edition = "2024" [dependencies] sqlx = { workspace = true } activitypub = { workspace = true } -k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.3" } +k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" } domain = { workspace = true } anyhow = { workspace = true } uuid = { workspace = true } diff --git a/crates/adapters/sqlite-federation/src/lib.rs b/crates/adapters/sqlite-federation/src/lib.rs index 89674d5..8bdbb07 100644 --- a/crates/adapters/sqlite-federation/src/lib.rs +++ b/crates/adapters/sqlite-federation/src/lib.rs @@ -656,6 +656,45 @@ impl FederationRepository for SqliteFederationRepository { .await?; Ok(count > 0) } + + async fn migrate_follower_actor( + &self, + old_actor_url: &str, + new_actor_url: &str, + ) -> Result> { + let candidates: Vec = sqlx::query_scalar( + "SELECT local_user_id FROM ap_following + WHERE remote_actor_url = ?1 + AND local_user_id NOT IN ( + SELECT local_user_id FROM ap_following WHERE remote_actor_url = ?2 + )", + ) + .bind(old_actor_url) + .bind(new_actor_url) + .fetch_all(&self.pool) + .await?; + + if candidates.is_empty() { + return Ok(vec![]); + } + + sqlx::query( + "UPDATE ap_following SET remote_actor_url = ?1 + WHERE remote_actor_url = ?2 + AND local_user_id NOT IN ( + SELECT local_user_id FROM ap_following WHERE remote_actor_url = ?1 + )", + ) + .bind(new_actor_url) + .bind(old_actor_url) + .execute(&self.pool) + .await?; + + candidates + .into_iter() + .map(|s| uuid::Uuid::parse_str(&s).map_err(|e| anyhow::anyhow!(e))) + .collect() + } } // --- Content-specific repository (movies-diary) --- diff --git a/crates/adapters/sqlite/src/ap_content.rs b/crates/adapters/sqlite/src/ap_content.rs new file mode 100644 index 0000000..4011082 --- /dev/null +++ b/crates/adapters/sqlite/src/ap_content.rs @@ -0,0 +1,109 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::{DiaryEntry, Movie, Review, WatchlistWithMovie}, + ports::LocalApContentQuery, + value_objects::{MovieId, ReviewId, UserId}, +}; +use sqlx::SqlitePool; + +use crate::models::{DiaryRow, MovieRow, ReviewRow, WatchlistRow}; + +pub struct SqliteApContentQuery { + pool: SqlitePool, +} + +impl SqliteApContentQuery { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } +} + +#[async_trait] +impl LocalApContentQuery for SqliteApContentQuery { + async fn get_local_reviews_for_user( + &self, + user_id: &UserId, + ) -> Result, DomainError> { + let uid = user_id.value().to_string(); + let rows = sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = ? AND r.remote_actor_url IS NULL + ORDER BY r.created_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + rows.into_iter().map(DiaryRow::into_domain).collect() + } + + async fn get_local_watchlist_for_user( + &self, + user_id: &UserId, + ) -> Result, DomainError> { + let uid = user_id.value().to_string(); + let rows: Vec = sqlx::query_as( + "SELECT w.id, w.user_id, w.movie_id, w.added_at, + m.id AS m_id, m.external_metadata_id, m.title, m.release_year, + m.director, m.poster_path + FROM watchlist_entries w + JOIN movies m ON m.id = w.movie_id + WHERE w.user_id = ? + ORDER BY w.added_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + rows.into_iter().map(WatchlistRow::into_domain).collect() + } + + async fn get_review_by_id( + &self, + review_id: &ReviewId, + ) -> Result, DomainError> { + let id = review_id.value().to_string(); + sqlx::query_as::<_, ReviewRow>( + "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url + FROM reviews WHERE id = ?", + ) + .bind(&id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .map(ReviewRow::into_domain) + .transpose() + } + + async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError> { + let id = movie_id.value().to_string(); + sqlx::query_as::<_, MovieRow>( + "SELECT id, external_metadata_id, title, release_year, director, poster_path + FROM movies WHERE id = ?", + ) + .bind(&id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .map(MovieRow::into_domain) + .transpose() + } + + async fn count_local_posts(&self) -> Result { + let count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM reviews WHERE remote_actor_url IS NULL") + .fetch_one(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(count as u64) + } +} diff --git a/crates/adapters/sqlite/src/lib.rs b/crates/adapters/sqlite/src/lib.rs index 704dcfc..370943e 100644 --- a/crates/adapters/sqlite/src/lib.rs +++ b/crates/adapters/sqlite/src/lib.rs @@ -12,6 +12,7 @@ use domain::{ }; use sqlx::SqlitePool; +mod ap_content; mod image_ref; mod import_profile; mod import_session; @@ -28,6 +29,7 @@ use models::{ MovieSummaryRow, ReviewRow, UserTotalsRow, datetime_to_str, }; +pub use ap_content::SqliteApContentQuery; pub use image_ref::{SqliteImageRefAdapter, create_image_ref}; pub use import_profile::SqliteImportProfileRepository; pub use import_session::SqliteImportSessionRepository; @@ -944,6 +946,7 @@ pub async fn wire( std::sync::Arc, std::sync::Arc, std::sync::Arc, + std::sync::Arc, )> { use anyhow::Context; use sqlx::sqlite::SqliteConnectOptions; @@ -968,6 +971,7 @@ pub async fn wire( let import_profile_repo = std::sync::Arc::new(SqliteImportProfileRepository::new(pool.clone())); let movie_profile_repo = std::sync::Arc::new(SqliteMovieProfileRepository::new(pool.clone())); let watchlist_repo = std::sync::Arc::new(SqliteWatchlistRepository::new(pool.clone())); + let ap_content = std::sync::Arc::new(SqliteApContentQuery::new(pool.clone())); Ok(( pool.clone(), @@ -980,6 +984,7 @@ pub async fn wire( import_profile_repo as _, movie_profile_repo as _, watchlist_repo as _, + ap_content as _, )) } diff --git a/crates/application/src/test_helpers.rs b/crates/application/src/test_helpers.rs index dfee9a6..85ff15d 100644 --- a/crates/application/src/test_helpers.rs +++ b/crates/application/src/test_helpers.rs @@ -18,6 +18,8 @@ use domain::{ PanicSearchPort, PanicStatsRepository, }, }; +#[cfg(feature = "federation")] +use domain::testing::PanicRemoteWatchlistRepository; use crate::{ config::AppConfig, @@ -143,6 +145,8 @@ impl TestContextBuilder { search_port: self.search_port, search_command: self.search_command, config: self.config, + #[cfg(feature = "federation")] + remote_watchlist_repository: std::sync::Arc::new(PanicRemoteWatchlistRepository), } } } diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index e81b282..1eb9132 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -375,3 +375,27 @@ pub trait RemoteWatchlistRepository: Send + Sync { uuid: uuid::Uuid, ) -> Result, DomainError>; } + +/// Read-only query port used exclusively by the ActivityPub adapter. +/// Consolidates all reads the AP adapter needs so it never touches write repositories. +#[async_trait] +pub trait LocalApContentQuery: Send + Sync { + async fn get_local_reviews_for_user( + &self, + user_id: &UserId, + ) -> Result, DomainError>; + + async fn get_local_watchlist_for_user( + &self, + user_id: &UserId, + ) -> Result, DomainError>; + + async fn get_review_by_id( + &self, + review_id: &ReviewId, + ) -> Result, DomainError>; + + async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError>; + + async fn count_local_posts(&self) -> Result; +} diff --git a/crates/domain/src/testing.rs b/crates/domain/src/testing.rs index 8343654..c2ef607 100644 --- a/crates/domain/src/testing.rs +++ b/crates/domain/src/testing.rs @@ -668,6 +668,27 @@ impl DocumentParser for PanicDocumentParser { // ── PanicProfileFieldsRepo ──────────────────────────────────────────────────── +pub struct PanicRemoteWatchlistRepository; + +#[async_trait] +impl crate::ports::RemoteWatchlistRepository for PanicRemoteWatchlistRepository { + async fn save(&self, _: crate::models::RemoteWatchlistEntry) -> Result<(), DomainError> { + panic!("PanicRemoteWatchlistRepository called") + } + async fn remove_by_ap_id(&self, _: &str, _: &str) -> Result<(), DomainError> { + panic!("PanicRemoteWatchlistRepository called") + } + async fn get_by_actor_url(&self, _: &str) -> Result, DomainError> { + panic!("PanicRemoteWatchlistRepository called") + } + async fn remove_all_by_actor(&self, _: &str) -> Result<(), DomainError> { + panic!("PanicRemoteWatchlistRepository called") + } + async fn get_by_derived_uuid(&self, _: uuid::Uuid) -> Result, DomainError> { + panic!("PanicRemoteWatchlistRepository called") + } +} + pub struct PanicProfileFieldsRepo; #[async_trait] diff --git a/crates/presentation/src/factory.rs b/crates/presentation/src/factory.rs index e11c3c6..65e7d4c 100644 --- a/crates/presentation/src/factory.rs +++ b/crates/presentation/src/factory.rs @@ -4,10 +4,10 @@ use anyhow::Context; use domain::ports::{ AuthService, DiaryRepository, ImageStorage, ImportProfileRepository, - ImportSessionRepository, MetadataClient, MovieProfileRepository, MovieRepository, - PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient, ReviewRepository, - SearchCommand, SearchPort, StatsRepository, UserProfileFieldsRepository, UserRepository, - WatchlistRepository, + ImportSessionRepository, LocalApContentQuery, MetadataClient, MovieProfileRepository, + MovieRepository, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient, + ReviewRepository, SearchCommand, SearchPort, StatsRepository, UserProfileFieldsRepository, + UserRepository, WatchlistRepository, }; pub struct DatabaseAdapters { @@ -20,6 +20,7 @@ pub struct DatabaseAdapters { pub import_profile_repo: Arc, pub movie_profile_repo: Arc, pub watchlist_repo: Arc, + pub ap_content_repo: Arc, pub person_command: Arc, pub person_query: Arc, pub search_port: Arc, @@ -42,7 +43,7 @@ pub async fn build_database_adapters( match backend { #[cfg(feature = "postgres")] "postgres" => { - let (pool, m, r, d, s, u, is, ip, mp, wl) = postgres::wire(url) + let (pool, m, r, d, s, u, is, ip, mp, wl, ac) = postgres::wire(url) .await .context("PostgreSQL connection failed")?; let (pc, pq) = postgres::create_person_adapter(pool.clone()); @@ -58,6 +59,7 @@ pub async fn build_database_adapters( import_profile_repo: ip, movie_profile_repo: mp, watchlist_repo: wl, + ap_content_repo: ac, person_command: pc, person_query: pq, search_port: sp, @@ -68,7 +70,7 @@ pub async fn build_database_adapters( } #[cfg(feature = "sqlite")] _ => { - let (pool, m, r, d, s, u, is, ip, mp, wl) = sqlite::wire(url) + let (pool, m, r, d, s, u, is, ip, mp, wl, ac) = sqlite::wire(url) .await .context("SQLite connection failed")?; let (pc, pq) = sqlite::create_person_adapter(pool.clone()); @@ -84,6 +86,7 @@ pub async fn build_database_adapters( import_profile_repo: ip, movie_profile_repo: mp, watchlist_repo: wl, + ap_content_repo: ac, person_command: pc, person_query: pq, search_port: sp, diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index 1c1998c..404f80a 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -69,6 +69,7 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { let import_profile_repository = db.import_profile_repo; let movie_profile_repository = db.movie_profile_repo; let watchlist_repository = db.watchlist_repo; + let ap_content_repo = db.ap_content_repo; let person_command = db.person_command; let person_query = db.person_query; let search_port = db.search_port; @@ -121,10 +122,8 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { federation_repo, review_store, remote_watchlist_repo.clone(), + Arc::clone(&ap_content_repo), Arc::clone(&user_repository), - Arc::clone(&movie_repository), - Arc::clone(&review_repository), - Arc::clone(&diary_repository), app_config.base_url.clone(), app_config.allow_registration, Arc::clone(&ep), diff --git a/crates/worker/src/db.rs b/crates/worker/src/db.rs index db9f32d..94887c2 100644 --- a/crates/worker/src/db.rs +++ b/crates/worker/src/db.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use anyhow::Context; use domain::ports::{ DiaryRepository, ImageRefCommand, ImageRefQuery, ImportProfileRepository, - ImportSessionRepository, MovieProfileRepository, MovieRepository, PersonCommand, PersonQuery, - ReviewRepository, SearchCommand, SearchPort, StatsRepository, UserProfileFieldsRepository, - UserRepository, WatchlistRepository, + ImportSessionRepository, LocalApContentQuery, MovieProfileRepository, MovieRepository, + PersonCommand, PersonQuery, ReviewRepository, SearchCommand, SearchPort, StatsRepository, + UserProfileFieldsRepository, UserRepository, WatchlistRepository, }; pub enum DbPool { @@ -25,6 +25,7 @@ pub struct Repos { pub import_profile: Arc, pub movie_profile: Arc, pub watchlist: Arc, + pub ap_content: Arc, pub image_ref_command: Arc, pub image_ref_query: Arc, pub person_command: Arc, @@ -38,7 +39,7 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos match backend { #[cfg(feature = "postgres")] "postgres" => { - let (pool, m, r, d, s, u, is, ip, mp, wl) = postgres::wire(database_url) + let (pool, m, r, d, s, u, is, ip, mp, wl, ac) = postgres::wire(database_url) .await .context("PostgreSQL connection failed")?; let (image_ref_command, image_ref_query) = postgres::create_image_ref(pool.clone()); @@ -57,6 +58,7 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos import_profile: ip, movie_profile: mp, watchlist: wl, + ap_content: ac, image_ref_command, image_ref_query, person_command, @@ -70,7 +72,7 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos } #[cfg(feature = "sqlite")] _ => { - let (pool, m, r, d, s, u, is, ip, mp, wl) = sqlite::wire(database_url) + let (pool, m, r, d, s, u, is, ip, mp, wl, ac) = sqlite::wire(database_url) .await .context("SQLite connection failed")?; let (image_ref_command, image_ref_query) = sqlite::create_image_ref(pool.clone()); @@ -88,6 +90,7 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos import_profile: ip, movie_profile: mp, watchlist: wl, + ap_content: ac, image_ref_command, image_ref_query, person_command, diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 390fb47..15f2a4a 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -48,16 +48,12 @@ async fn main() -> anyhow::Result<()> { // Clone refs federation handler needs before ctx consumes them. #[cfg(feature = "federation")] let ( - fed_movie_repo, - fed_review_repo, - fed_diary_repo, + fed_ap_content, fed_user_repo, base_url, allow_registration, ) = ( - Arc::clone(&repos.movie), - Arc::clone(&repos.review), - Arc::clone(&repos.diary), + Arc::clone(&repos.ap_content), Arc::clone(&repos.user), app_config.base_url.clone(), app_config.allow_registration, @@ -202,10 +198,8 @@ async fn main() -> anyhow::Result<()> { fed_federation_repo, fed_review_store, fed_remote_watchlist_repo, + fed_ap_content, fed_user_repo, - fed_movie_repo, - fed_review_repo, - fed_diary_repo, base_url, allow_registration, Arc::clone(&ctx.event_publisher),