feat: add axum dependency and implement wiring functions for federation repositories

This commit is contained in:
2026-05-10 18:58:41 +02:00
parent 810d051dee
commit e461c689d9
12 changed files with 138 additions and 130 deletions

2
Cargo.lock generated
View File

@@ -10,6 +10,7 @@ dependencies = [
"activitypub_federation", "activitypub_federation",
"anyhow", "anyhow",
"async-trait", "async-trait",
"axum",
"chrono", "chrono",
"domain", "domain",
"serde", "serde",
@@ -2761,6 +2762,7 @@ dependencies = [
name = "metadata" name = "metadata"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"async-trait", "async-trait",
"domain", "domain",
"reqwest 0.13.3", "reqwest 0.13.3",

View File

@@ -6,6 +6,7 @@ edition = "2024"
[dependencies] [dependencies]
activitypub-base = { workspace = true } activitypub-base = { workspace = true }
domain = { workspace = true } domain = { workspace = true }
axum = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }

View File

@@ -17,3 +17,49 @@ pub use port::{ActivityPubPort, NoopActivityPubService};
pub use remote_review_repository::RemoteReviewRepository; pub use remote_review_repository::RemoteReviewRepository;
pub use review_handler::ReviewObjectHandler; pub use review_handler::ReviewObjectHandler;
pub use user_adapter::DomainUserRepoAdapter; pub use user_adapter::DomainUserRepoAdapter;
pub struct ActivityPubWire {
pub service: std::sync::Arc<dyn ActivityPubPort>,
pub router: axum::Router,
pub event_handler: std::sync::Arc<dyn domain::ports::EventHandler>,
}
pub async fn wire(
federation_repo: std::sync::Arc<dyn FederationRepository>,
review_store: std::sync::Arc<dyn RemoteReviewRepository>,
user_repo: std::sync::Arc<dyn domain::ports::UserRepository>,
movie_repo: std::sync::Arc<dyn domain::ports::MovieRepository>,
review_repo: std::sync::Arc<dyn domain::ports::ReviewRepository>,
diary_repo: std::sync::Arc<dyn domain::ports::DiaryRepository>,
base_url: String,
) -> anyhow::Result<ActivityPubWire> {
let concrete = std::sync::Arc::new(
ActivityPubService::new(
federation_repo,
std::sync::Arc::new(DomainUserRepoAdapter(user_repo)),
std::sync::Arc::new(ReviewObjectHandler {
movie_repository: std::sync::Arc::clone(&movie_repo),
diary_repository: diary_repo,
review_store,
base_url: base_url.clone(),
}),
base_url.clone(),
cfg!(debug_assertions),
)
.await?,
);
let router = concrete.router();
let event_handler = std::sync::Arc::new(ActivityPubEventHandler::new(
std::sync::Arc::clone(&concrete),
movie_repo,
review_repo,
base_url,
)) as std::sync::Arc<dyn domain::ports::EventHandler>;
Ok(ActivityPubWire {
service: concrete as std::sync::Arc<dyn ActivityPubPort>,
router,
event_handler,
})
}

View File

@@ -105,3 +105,14 @@ impl PasswordHasher for Argon2PasswordHasher {
.is_ok()) .is_ok())
} }
} }
pub fn create() -> anyhow::Result<(
std::sync::Arc<dyn domain::ports::AuthService>,
std::sync::Arc<dyn domain::ports::PasswordHasher>,
)> {
let config = AuthConfig::from_env()?;
Ok((
std::sync::Arc::new(JwtAuthService::new(config)),
std::sync::Arc::new(Argon2PasswordHasher),
))
}

View File

@@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true } async-trait = { workspace = true }
reqwest = { workspace = true } reqwest = { workspace = true }
serde = { workspace = true } serde = { workspace = true }

View File

@@ -65,3 +65,14 @@ impl MetadataClient for MetadataClientImpl {
Ok(pm.poster_url) Ok(pm.poster_url)
} }
} }
pub fn create() -> anyhow::Result<std::sync::Arc<dyn domain::ports::MetadataClient>> {
use anyhow::Context;
if let Ok(key) = std::env::var("TMDB_API_KEY") {
Ok(std::sync::Arc::new(MetadataClientImpl::new_tmdb(key)))
} else {
let key = std::env::var("OMDB_API_KEY")
.context("either TMDB_API_KEY or OMDB_API_KEY must be set")?;
Ok(std::sync::Arc::new(MetadataClientImpl::new_omdb(key)))
}
}

View File

@@ -36,3 +36,7 @@ impl PosterFetcherClient for ReqwestPosterFetcher {
Ok(bytes.to_vec()) Ok(bytes.to_vec())
} }
} }
pub fn create() -> anyhow::Result<std::sync::Arc<dyn domain::ports::PosterFetcherClient>> {
Ok(std::sync::Arc::new(ReqwestPosterFetcher::new(PosterFetcherConfig::from_env())?))
}

View File

@@ -68,6 +68,10 @@ impl PosterStorage for PosterStorageAdapter {
} }
} }
pub fn create() -> anyhow::Result<std::sync::Arc<dyn domain::ports::PosterStorage>> {
Ok(std::sync::Arc::new(PosterStorageAdapter::from_config(StorageConfig::from_env()?)))
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@@ -474,3 +474,16 @@ impl domain::ports::SocialQueryPort for PostgresFederationRepository {
Ok(rows.into_iter().map(|(url, handle, display_name)| domain::ports::RemoteActorInfo { url, handle, display_name }).collect()) Ok(rows.into_iter().map(|(url, handle, display_name)| domain::ports::RemoteActorInfo { url, handle, display_name }).collect())
} }
} }
pub fn wire(pool: sqlx::PgPool) -> (
std::sync::Arc<dyn activitypub::FederationRepository>,
std::sync::Arc<dyn domain::ports::SocialQueryPort>,
std::sync::Arc<dyn activitypub::RemoteReviewRepository>,
) {
let fed = std::sync::Arc::new(PostgresFederationRepository::new(pool));
(
std::sync::Arc::clone(&fed) as _,
std::sync::Arc::clone(&fed) as _,
fed as _,
)
}

View File

@@ -537,6 +537,19 @@ impl domain::ports::SocialQueryPort for SqliteFederationRepository {
} }
} }
pub fn wire(pool: sqlx::SqlitePool) -> (
std::sync::Arc<dyn activitypub::FederationRepository>,
std::sync::Arc<dyn domain::ports::SocialQueryPort>,
std::sync::Arc<dyn activitypub::RemoteReviewRepository>,
) {
let fed = std::sync::Arc::new(SqliteFederationRepository::new(pool));
(
std::sync::Arc::clone(&fed) as _,
std::sync::Arc::clone(&fed) as _,
fed as _,
)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@@ -5,23 +5,8 @@ use anyhow::Context;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[cfg(feature = "sqlite-federation")]
use sqlite_federation::SqliteFederationRepository;
#[cfg(feature = "postgres-federation")]
use postgres_federation::PostgresFederationRepository;
#[cfg(feature = "federation")]
use activitypub::{
ActivityPubPort, ActivityPubService, DomainUserRepoAdapter,
ReviewObjectHandler,
};
use application::{config::AppConfig, context::AppContext}; use application::{config::AppConfig, context::AppContext};
use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService};
use export::ExportAdapter; use export::ExportAdapter;
use metadata::MetadataClientImpl;
use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher};
use poster_storage::{PosterStorageAdapter, StorageConfig};
use rss::RssAdapter; use rss::RssAdapter;
use template_askama::AskamaHtmlRenderer; use template_askama::AskamaHtmlRenderer;
@@ -29,10 +14,7 @@ use doc::ApiDocExt;
use presentation::{openapi::ApiDoc, routes, state::AppState}; use presentation::{openapi::ApiDoc, routes, state::AppState};
use utoipa::OpenApi as _; use utoipa::OpenApi as _;
use domain::ports::{ use domain::ports::{DiaryExporter, EventPublisher};
AuthService, DiaryExporter, EventPublisher, MetadataClient,
PasswordHasher, PosterFetcherClient, PosterStorage,
};
#[cfg(not(any(feature = "sqlite", feature = "postgres")))] #[cfg(not(any(feature = "sqlite", feature = "postgres")))]
compile_error!("At least one database backend must be enabled. Use --features sqlite or --features postgres"); compile_error!("At least one database backend must be enabled. Use --features sqlite or --features postgres");
@@ -59,26 +41,14 @@ async fn main() -> anyhow::Result<()> {
} }
async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
let auth_config = AuthConfig::from_env()?;
let storage_config = StorageConfig::from_env()?;
let app_config = AppConfig::from_env(); let app_config = AppConfig::from_env();
let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?; let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?;
let backend = std::env::var("DATABASE_BACKEND").unwrap_or_else(|_| "sqlite".to_string()); let backend = std::env::var("DATABASE_BACKEND").unwrap_or_else(|_| "sqlite".to_string());
let metadata_client: Arc<dyn MetadataClient> = let (auth_service, password_hasher) = auth::create()?;
if let Ok(tmdb_key) = std::env::var("TMDB_API_KEY") { let metadata_client = metadata::create()?;
Arc::new(MetadataClientImpl::new_tmdb(tmdb_key)) let poster_fetcher = poster_fetcher::create()?;
} else { let poster_storage = poster_storage::create()?;
let omdb_key = std::env::var("OMDB_API_KEY")
.context("Either TMDB_API_KEY or OMDB_API_KEY must be set")?;
Arc::new(MetadataClientImpl::new_omdb(omdb_key))
};
let poster_fetcher: Arc<dyn PosterFetcherClient> =
Arc::new(ReqwestPosterFetcher::new(PosterFetcherConfig::from_env())?);
let poster_storage: Arc<dyn PosterStorage> =
Arc::new(PosterStorageAdapter::from_config(storage_config));
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, db_pool) = let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, db_pool) =
match backend.as_str() { match backend.as_str() {
@@ -101,44 +71,26 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
#[cfg(feature = "federation")] #[cfg(feature = "federation")]
let (event_publisher_arc, ap_router, ap_service, social_query) = { let (event_publisher_arc, ap_router, ap_service, social_query) = {
let (federation_repo, social_query_arc, review_store): ( let (federation_repo, social_query_arc, review_store) = match &db_pool {
Arc<dyn activitypub::FederationRepository>,
Arc<dyn domain::ports::SocialQueryPort>,
Arc<dyn activitypub::RemoteReviewRepository>,
) = match &db_pool {
#[cfg(feature = "postgres-federation")] #[cfg(feature = "postgres-federation")]
DbPool::Postgres(pool) => { DbPool::Postgres(pool) => postgres_federation::wire(pool.clone()),
let fed = Arc::new(PostgresFederationRepository::new(pool.clone()));
(Arc::clone(&fed) as _, Arc::clone(&fed) as _, fed as _)
}
#[cfg(feature = "sqlite-federation")] #[cfg(feature = "sqlite-federation")]
DbPool::Sqlite(pool) => { DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()),
let fed = Arc::new(SqliteFederationRepository::new(pool.clone()));
(Arc::clone(&fed) as _, Arc::clone(&fed) as _, fed as _)
}
#[cfg(not(feature = "sqlite-federation"))] #[cfg(not(feature = "sqlite-federation"))]
_ => anyhow::bail!("DATABASE_BACKEND={backend} federation is not supported by this build"), _ => anyhow::bail!("DATABASE_BACKEND={backend} federation is not supported by this build"),
}; };
let user_repo_adapter = Arc::new(DomainUserRepoAdapter(Arc::clone(&user_repository))); let ap = activitypub::wire(
let review_handler = Arc::new(ReviewObjectHandler { federation_repo,
movie_repository: Arc::clone(&movie_repository),
diary_repository: Arc::clone(&diary_repository),
review_store, review_store,
base_url: app_config.base_url.clone(), Arc::clone(&user_repository),
}); Arc::clone(&movie_repository),
let concrete_ap_service = Arc::new( Arc::clone(&review_repository),
ActivityPubService::new( Arc::clone(&diary_repository),
federation_repo, app_config.base_url.clone(),
user_repo_adapter, ).await?;
review_handler, let ap_router = ap.router;
app_config.base_url.clone(), let ap_service_arc = ap.service;
cfg!(debug_assertions),
)
.await?,
);
let ap_router = concrete_ap_service.router();
let ap_service_arc: Arc<dyn ActivityPubPort> = concrete_ap_service;
let ep: Arc<dyn EventPublisher> = match event_bus { let ep: Arc<dyn EventPublisher> = match event_bus {
EventBusBackend::Db => { EventBusBackend::Db => {

View File

@@ -2,27 +2,11 @@ use std::sync::Arc;
use anyhow::Context; use anyhow::Context;
use application::{config::AppConfig, context::AppContext, event_handlers::PosterSyncHandler, worker::WorkerService}; use application::{config::AppConfig, context::AppContext, event_handlers::PosterSyncHandler, worker::WorkerService};
use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService};
use export::ExportAdapter; use export::ExportAdapter;
use metadata::MetadataClientImpl;
use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher};
use poster_storage::{PosterStorageAdapter, StorageConfig};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[cfg(feature = "sqlite-federation")]
use sqlite_federation::SqliteFederationRepository;
#[cfg(feature = "postgres-federation")]
use postgres_federation::PostgresFederationRepository;
#[cfg(feature = "federation")] use domain::ports::{DiaryExporter, EventHandler};
use activitypub::{
ActivityPubEventHandler, ActivityPubService, DomainUserRepoAdapter, ReviewObjectHandler,
};
use domain::ports::{
AuthService, DiaryExporter, EventHandler, MetadataClient,
PasswordHasher, PosterFetcherClient, PosterStorage,
};
#[cfg(not(any(feature = "sqlite", feature = "postgres")))] #[cfg(not(any(feature = "sqlite", feature = "postgres")))]
compile_error!("At least one database backend must be enabled. Use --features sqlite or --features postgres"); compile_error!("At least one database backend must be enabled. Use --features sqlite or --features postgres");
@@ -34,24 +18,12 @@ async fn main() -> anyhow::Result<()> {
let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?; let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?;
let backend = std::env::var("DATABASE_BACKEND").unwrap_or_else(|_| "sqlite".to_string()); let backend = std::env::var("DATABASE_BACKEND").unwrap_or_else(|_| "sqlite".to_string());
let auth_config = AuthConfig::from_env()?;
let storage_config = StorageConfig::from_env()?;
let app_config = AppConfig::from_env(); let app_config = AppConfig::from_env();
let metadata_client: Arc<dyn MetadataClient> = let (auth_service, password_hasher) = auth::create()?;
if let Ok(tmdb_key) = std::env::var("TMDB_API_KEY") { let metadata_client = metadata::create()?;
Arc::new(MetadataClientImpl::new_tmdb(tmdb_key)) let poster_fetcher = poster_fetcher::create()?;
} else { let poster_storage = poster_storage::create()?;
let omdb_key = std::env::var("OMDB_API_KEY")
.context("Either TMDB_API_KEY or OMDB_API_KEY must be set")?;
Arc::new(MetadataClientImpl::new_omdb(omdb_key))
};
let poster_fetcher: Arc<dyn PosterFetcherClient> =
Arc::new(ReqwestPosterFetcher::new(PosterFetcherConfig::from_env())?);
let poster_storage: Arc<dyn PosterStorage> =
Arc::new(PosterStorageAdapter::from_config(storage_config));
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, db_pool) = let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, db_pool) =
match backend.as_str() { match backend.as_str() {
@@ -125,44 +97,22 @@ async fn main() -> anyhow::Result<()> {
#[cfg(feature = "federation")] #[cfg(feature = "federation")]
{ {
let (federation_repo, review_store): ( let (federation_repo, _social_query, review_store) = match &db_pool {
Arc<dyn activitypub::FederationRepository>,
Arc<dyn activitypub::RemoteReviewRepository>,
) = match &db_pool {
#[cfg(feature = "sqlite-federation")] #[cfg(feature = "sqlite-federation")]
DbPool::Sqlite(pool) => { DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()),
let fed = Arc::new(SqliteFederationRepository::new(pool.clone()));
(Arc::clone(&fed) as _, fed as _)
}
#[cfg(feature = "postgres-federation")] #[cfg(feature = "postgres-federation")]
DbPool::Postgres(pool) => { DbPool::Postgres(pool) => postgres_federation::wire(pool.clone()),
let fed = Arc::new(PostgresFederationRepository::new(pool.clone()));
(Arc::clone(&fed) as _, fed as _)
}
}; };
let ap_service = Arc::new( let ap = activitypub::wire(
ActivityPubService::new( federation_repo,
federation_repo, review_store,
Arc::new(DomainUserRepoAdapter(fed_user_repo)), fed_user_repo,
Arc::new(ReviewObjectHandler {
movie_repository: Arc::clone(&fed_movie_repo),
diary_repository: fed_diary_repo,
review_store,
base_url: base_url.clone(),
}),
base_url.clone(),
cfg!(debug_assertions),
)
.await?,
);
let ap = Arc::new(ActivityPubEventHandler::new(
ap_service,
fed_movie_repo, fed_movie_repo,
fed_review_repo, fed_review_repo,
fed_diary_repo,
base_url, base_url,
)) as Arc<dyn EventHandler>; ).await?.event_handler;
tracing::info!("federation event handler registered"); tracing::info!("federation event handler registered");
vec![poster, ap] vec![poster, ap]