From af25a43bbc8fccff5d10295e2cf25ba3c1334f24 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 10 May 2026 18:22:24 +0200 Subject: [PATCH] feat: add federation support with SQLite and Postgres repositories --- Cargo.lock | 3 ++ crates/worker/Cargo.toml | 8 +++++ crates/worker/src/main.rs | 76 +++++++++++++++++++++++++++++++++++---- 3 files changed, 81 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bad9056..a526cd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6234,6 +6234,7 @@ dependencies = [ name = "worker" version = "0.1.0" dependencies = [ + "activitypub", "anyhow", "application", "async-trait", @@ -6250,10 +6251,12 @@ dependencies = [ "poster-storage", "postgres", "postgres-event-queue", + "postgres-federation", "serde", "serde_json", "sqlite", "sqlite-event-queue", + "sqlite-federation", "sqlx", "thiserror 2.0.18", "tokio", diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index fa8169a..66035ae 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -8,6 +8,9 @@ default = ["sqlite"] sqlite = ["dep:sqlite", "dep:sqlite-event-queue"] postgres = ["dep:postgres", "dep:postgres-event-queue"] nats = ["dep:nats"] +federation = [] +sqlite-federation = ["sqlite", "dep:sqlite-federation", "dep:activitypub", "federation"] +postgres-federation = ["postgres", "dep:postgres-federation", "dep:activitypub", "federation"] [dependencies] domain = { workspace = true } @@ -38,3 +41,8 @@ sqlite = { workspace = true, optional = true } postgres = { workspace = true, optional = true } sqlite-event-queue = { workspace = true, optional = true } postgres-event-queue = { workspace = true, optional = true } + +# Optional — federation +activitypub = { workspace = true, optional = true } +sqlite-federation = { workspace = true, optional = true } +postgres-federation = { workspace = true, optional = true } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 71bfb3b..7eb4d8f 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -16,8 +16,18 @@ use sqlite::{SqliteMovieRepository, SqliteUserRepository}; #[cfg(feature = "postgres")] use postgres::{PostgresRepository, PostgresUserRepository}; +#[cfg(feature = "sqlite-federation")] +use sqlite_federation::SqliteFederationRepository; +#[cfg(feature = "postgres-federation")] +use postgres_federation::PostgresFederationRepository; + +#[cfg(feature = "federation")] +use activitypub::{ + ActivityPubEventHandler, ActivityPubService, DomainUserRepoAdapter, ReviewObjectHandler, +}; + use domain::ports::{ - AuthService, DiaryExporter, DiaryRepository, MetadataClient, MovieRepository, + AuthService, DiaryExporter, DiaryRepository, EventHandler, MetadataClient, MovieRepository, PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository, UserRepository, }; @@ -73,11 +83,11 @@ async fn main() -> anyhow::Result<()> { ) = match EventBusBackend::from_env()? { EventBusBackend::Db => { tracing::info!("event bus: DB queue"); - match db_pool { + match &db_pool { #[cfg(feature = "postgres")] - DbPool::Postgres(pool) => postgres_event_queue::PostgresEventQueue::create_channel(pool).await?, + DbPool::Postgres(pool) => postgres_event_queue::PostgresEventQueue::create_channel(pool.clone()).await?, #[cfg(feature = "sqlite")] - DbPool::Sqlite(pool) => sqlite_event_queue::SqliteEventQueue::create_channel(pool).await?, + DbPool::Sqlite(pool) => sqlite_event_queue::SqliteEventQueue::create_channel(pool.clone()).await?, } } #[cfg(feature = "nats")] @@ -89,6 +99,16 @@ async fn main() -> anyhow::Result<()> { } }; + // Clone what federation handler needs before ctx and app_config are consumed. + #[cfg(feature = "federation")] + let (fed_movie_repo, fed_review_repo, fed_diary_repo, fed_user_repo, base_url) = ( + Arc::clone(&movie_repository), + Arc::clone(&review_repository), + Arc::clone(&diary_repository), + Arc::clone(&user_repository), + app_config.base_url.clone(), + ); + let ctx = AppContext { movie_repository, review_repository, @@ -105,8 +125,52 @@ async fn main() -> anyhow::Result<()> { config: app_config, }; - let poster_handler = Arc::new(PosterSyncHandler::new(ctx, 3)); - let worker = WorkerService::new(consumer_arc, vec![poster_handler]); + let mut handlers: Vec> = vec![Arc::new(PosterSyncHandler::new(ctx, 3))]; + + #[cfg(feature = "federation")] + { + let (federation_repo, review_store): ( + Arc, + Arc, + ) = match &db_pool { + #[cfg(feature = "sqlite-federation")] + DbPool::Sqlite(pool) => { + let fed = Arc::new(SqliteFederationRepository::new(pool.clone())); + (Arc::clone(&fed) as _, fed as _) + } + #[cfg(feature = "postgres-federation")] + DbPool::Postgres(pool) => { + let fed = Arc::new(PostgresFederationRepository::new(pool.clone())); + (Arc::clone(&fed) as _, fed as _) + } + }; + + let ap_service = Arc::new( + ActivityPubService::new( + federation_repo, + Arc::new(DomainUserRepoAdapter(fed_user_repo)), + Arc::new(ReviewObjectHandler { + movie_repository: Arc::clone(&fed_movie_repo), + diary_repository: fed_diary_repo, + review_store, + base_url: base_url.clone(), + }), + base_url.clone(), + cfg!(debug_assertions), + ) + .await?, + ); + + handlers.push(Arc::new(ActivityPubEventHandler::new( + ap_service, + fed_movie_repo, + fed_review_repo, + base_url, + ))); + tracing::info!("federation event handler registered"); + } + + let worker = WorkerService::new(consumer_arc, handlers); tracing::info!("worker started"); worker.run().await;