feat: add federation support with SQLite and Postgres repositories

This commit is contained in:
2026-05-10 18:22:24 +02:00
parent 660a8d618d
commit 3714b6d7a7
3 changed files with 81 additions and 6 deletions

3
Cargo.lock generated
View File

@@ -6234,6 +6234,7 @@ dependencies = [
name = "worker" name = "worker"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"activitypub",
"anyhow", "anyhow",
"application", "application",
"async-trait", "async-trait",
@@ -6250,10 +6251,12 @@ dependencies = [
"poster-storage", "poster-storage",
"postgres", "postgres",
"postgres-event-queue", "postgres-event-queue",
"postgres-federation",
"serde", "serde",
"serde_json", "serde_json",
"sqlite", "sqlite",
"sqlite-event-queue", "sqlite-event-queue",
"sqlite-federation",
"sqlx", "sqlx",
"thiserror 2.0.18", "thiserror 2.0.18",
"tokio", "tokio",

View File

@@ -8,6 +8,9 @@ default = ["sqlite"]
sqlite = ["dep:sqlite", "dep:sqlite-event-queue"] sqlite = ["dep:sqlite", "dep:sqlite-event-queue"]
postgres = ["dep:postgres", "dep:postgres-event-queue"] postgres = ["dep:postgres", "dep:postgres-event-queue"]
nats = ["dep:nats"] nats = ["dep:nats"]
federation = []
sqlite-federation = ["sqlite", "dep:sqlite-federation", "dep:activitypub", "federation"]
postgres-federation = ["postgres", "dep:postgres-federation", "dep:activitypub", "federation"]
[dependencies] [dependencies]
domain = { workspace = true } domain = { workspace = true }
@@ -38,3 +41,8 @@ sqlite = { workspace = true, optional = true }
postgres = { workspace = true, optional = true } postgres = { workspace = true, optional = true }
sqlite-event-queue = { workspace = true, optional = true } sqlite-event-queue = { workspace = true, optional = true }
postgres-event-queue = { workspace = true, optional = true } postgres-event-queue = { workspace = true, optional = true }
# Optional — federation
activitypub = { workspace = true, optional = true }
sqlite-federation = { workspace = true, optional = true }
postgres-federation = { workspace = true, optional = true }

View File

@@ -16,8 +16,18 @@ use sqlite::{SqliteMovieRepository, SqliteUserRepository};
#[cfg(feature = "postgres")] #[cfg(feature = "postgres")]
use postgres::{PostgresRepository, PostgresUserRepository}; use postgres::{PostgresRepository, PostgresUserRepository};
#[cfg(feature = "sqlite-federation")]
use sqlite_federation::SqliteFederationRepository;
#[cfg(feature = "postgres-federation")]
use postgres_federation::PostgresFederationRepository;
#[cfg(feature = "federation")]
use activitypub::{
ActivityPubEventHandler, ActivityPubService, DomainUserRepoAdapter, ReviewObjectHandler,
};
use domain::ports::{ use domain::ports::{
AuthService, DiaryExporter, DiaryRepository, MetadataClient, MovieRepository, AuthService, DiaryExporter, DiaryRepository, EventHandler, MetadataClient, MovieRepository,
PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository, PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository,
UserRepository, UserRepository,
}; };
@@ -73,11 +83,11 @@ async fn main() -> anyhow::Result<()> {
) = match EventBusBackend::from_env()? { ) = match EventBusBackend::from_env()? {
EventBusBackend::Db => { EventBusBackend::Db => {
tracing::info!("event bus: DB queue"); tracing::info!("event bus: DB queue");
match db_pool { match &db_pool {
#[cfg(feature = "postgres")] #[cfg(feature = "postgres")]
DbPool::Postgres(pool) => postgres_event_queue::PostgresEventQueue::create_channel(pool).await?, DbPool::Postgres(pool) => postgres_event_queue::PostgresEventQueue::create_channel(pool.clone()).await?,
#[cfg(feature = "sqlite")] #[cfg(feature = "sqlite")]
DbPool::Sqlite(pool) => sqlite_event_queue::SqliteEventQueue::create_channel(pool).await?, DbPool::Sqlite(pool) => sqlite_event_queue::SqliteEventQueue::create_channel(pool.clone()).await?,
} }
} }
#[cfg(feature = "nats")] #[cfg(feature = "nats")]
@@ -89,6 +99,16 @@ async fn main() -> anyhow::Result<()> {
} }
}; };
// Clone what federation handler needs before ctx and app_config are consumed.
#[cfg(feature = "federation")]
let (fed_movie_repo, fed_review_repo, fed_diary_repo, fed_user_repo, base_url) = (
Arc::clone(&movie_repository),
Arc::clone(&review_repository),
Arc::clone(&diary_repository),
Arc::clone(&user_repository),
app_config.base_url.clone(),
);
let ctx = AppContext { let ctx = AppContext {
movie_repository, movie_repository,
review_repository, review_repository,
@@ -105,8 +125,52 @@ async fn main() -> anyhow::Result<()> {
config: app_config, config: app_config,
}; };
let poster_handler = Arc::new(PosterSyncHandler::new(ctx, 3)); let mut handlers: Vec<Arc<dyn EventHandler>> = vec![Arc::new(PosterSyncHandler::new(ctx, 3))];
let worker = WorkerService::new(consumer_arc, vec![poster_handler]);
#[cfg(feature = "federation")]
{
let (federation_repo, review_store): (
Arc<dyn activitypub::FederationRepository>,
Arc<dyn activitypub::RemoteReviewRepository>,
) = match &db_pool {
#[cfg(feature = "sqlite-federation")]
DbPool::Sqlite(pool) => {
let fed = Arc::new(SqliteFederationRepository::new(pool.clone()));
(Arc::clone(&fed) as _, fed as _)
}
#[cfg(feature = "postgres-federation")]
DbPool::Postgres(pool) => {
let fed = Arc::new(PostgresFederationRepository::new(pool.clone()));
(Arc::clone(&fed) as _, fed as _)
}
};
let ap_service = Arc::new(
ActivityPubService::new(
federation_repo,
Arc::new(DomainUserRepoAdapter(fed_user_repo)),
Arc::new(ReviewObjectHandler {
movie_repository: Arc::clone(&fed_movie_repo),
diary_repository: fed_diary_repo,
review_store,
base_url: base_url.clone(),
}),
base_url.clone(),
cfg!(debug_assertions),
)
.await?,
);
handlers.push(Arc::new(ActivityPubEventHandler::new(
ap_service,
fed_movie_repo,
fed_review_repo,
base_url,
)));
tracing::info!("federation event handler registered");
}
let worker = WorkerService::new(consumer_arc, handlers);
tracing::info!("worker started"); tracing::info!("worker started");
worker.run().await; worker.run().await;