activity-pub implementation

This commit is contained in:
2026-05-08 21:26:50 +02:00
parent 940c33047c
commit df71748897
50 changed files with 2724 additions and 97 deletions

View File

@@ -26,11 +26,13 @@ metadata = { workspace = true }
poster-fetcher = { workspace = true }
poster-storage = { workspace = true }
sqlite = { workspace = true }
activitypub = { workspace = true }
sqlx = { workspace = true }
template-askama = { workspace = true }
event-publisher = { workspace = true }
rss = { workspace = true }
infer = "0.19.0"
percent-encoding = "2"
[dev-dependencies]
tower = { version = "0.5", features = ["util"] }

View File

@@ -234,11 +234,22 @@ impl From<DiaryQueryParams> for GetDiaryQuery {
}
}
#[derive(Deserialize)]
pub struct FollowForm {
pub handle: String,
}
#[derive(Deserialize)]
pub struct UnfollowForm {
pub actor_url: String,
}
#[derive(serde::Deserialize, Default)]
pub struct ProfileQueryParams {
pub view: Option<String>,
pub limit: Option<u32>,
pub offset: Option<u32>,
pub error: Option<String>,
}
#[cfg(test)]

View File

@@ -149,6 +149,7 @@ mod tests {
fn render_activity_feed_page(&self, _: application::ports::ActivityFeedPageData) -> Result<String, String> { panic!() }
fn render_users_page(&self, _: application::ports::UsersPageData) -> Result<String, String> { panic!() }
fn render_profile_page(&self, _: application::ports::ProfilePageData) -> Result<String, String> { panic!() }
fn render_following_page(&self, _: application::ports::FollowingPageData) -> Result<String, String> { panic!() }
}
struct PanicRssRenderer;
@@ -179,6 +180,7 @@ mod tests {
},
html_renderer: Arc::new(PanicRenderer),
rss_renderer: Arc::new(PanicRssRenderer),
ap_service: test_ap_service().await,
};
let app = test_router(state);
@@ -228,7 +230,33 @@ mod tests {
}
}
fn panic_state() -> crate::state::AppState {
async fn test_ap_service() -> std::sync::Arc<activitypub::ActivityPubService> {
use std::sync::Arc;
let pool = sqlx::SqlitePool::connect("sqlite::memory:").await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_keypairs (user_id TEXT PRIMARY KEY, public_key TEXT NOT NULL, private_key TEXT NOT NULL)")
.execute(&pool).await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_remote_actors (url TEXT PRIMARY KEY, handle TEXT NOT NULL, inbox_url TEXT NOT NULL, shared_inbox_url TEXT, display_name TEXT)")
.execute(&pool).await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_followers (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', PRIMARY KEY (local_user_id, remote_actor_url))")
.execute(&pool).await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_following (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, PRIMARY KEY (local_user_id, remote_actor_url))")
.execute(&pool).await.unwrap();
let fed_repo = Arc::new(sqlite::SqliteFederationRepository::new(pool));
struct DummyUserRepo;
#[async_trait::async_trait] impl domain::ports::UserRepository for DummyUserRepo {
async fn find_by_email(&self, _: &domain::value_objects::Email) -> Result<Option<domain::models::User>, domain::errors::DomainError> { Ok(None) }
async fn save(&self, _: &domain::models::User) -> Result<(), domain::errors::DomainError> { Ok(()) }
async fn find_by_id(&self, _: &domain::value_objects::UserId) -> Result<Option<domain::models::User>, domain::errors::DomainError> { Ok(None) }
async fn list_with_stats(&self) -> Result<Vec<domain::models::UserSummary>, domain::errors::DomainError> { Ok(vec![]) }
}
Arc::new(
activitypub::ActivityPubService::new(fed_repo, Arc::new(DummyUserRepo), "http://localhost:3000".to_string(), true)
.await
.unwrap(),
)
}
async fn panic_state() -> crate::state::AppState {
use std::sync::Arc;
use application::context::AppContext;
struct PanicRepo2;
@@ -266,6 +294,7 @@ mod tests {
fn render_activity_feed_page(&self, _: application::ports::ActivityFeedPageData) -> Result<String, String> { panic!() }
fn render_users_page(&self, _: application::ports::UsersPageData) -> Result<String, String> { panic!() }
fn render_profile_page(&self, _: application::ports::ProfilePageData) -> Result<String, String> { panic!() }
fn render_following_page(&self, _: application::ports::FollowingPageData) -> Result<String, String> { panic!() }
}
struct PanicRssRenderer2;
impl crate::ports::RssFeedRenderer for PanicRssRenderer2 {
@@ -286,10 +315,11 @@ mod tests {
},
html_renderer: Arc::new(PanicRenderer2),
rss_renderer: Arc::new(PanicRssRenderer2),
ap_service: test_ap_service().await,
}
}
fn rejecting_state() -> crate::state::AppState {
async fn rejecting_state() -> crate::state::AppState {
use std::sync::Arc;
use application::context::AppContext;
struct PanicRepo3;
@@ -326,6 +356,7 @@ mod tests {
fn render_activity_feed_page(&self, _: application::ports::ActivityFeedPageData) -> Result<String, String> { panic!() }
fn render_users_page(&self, _: application::ports::UsersPageData) -> Result<String, String> { panic!() }
fn render_profile_page(&self, _: application::ports::ProfilePageData) -> Result<String, String> { panic!() }
fn render_following_page(&self, _: application::ports::FollowingPageData) -> Result<String, String> { panic!() }
}
struct PanicRssRenderer3;
impl crate::ports::RssFeedRenderer for PanicRssRenderer3 {
@@ -345,12 +376,13 @@ mod tests {
},
html_renderer: Arc::new(PanicRenderer3),
rss_renderer: Arc::new(PanicRssRenderer3),
ap_service: test_ap_service().await,
}
}
#[tokio::test]
async fn optional_cookie_user_returns_none_without_cookie() {
let app = test_router_optional(panic_state());
let app = test_router_optional(panic_state().await);
let response = app
.oneshot(Request::builder().uri("/optional").body(Body::empty()).unwrap())
.await
@@ -362,7 +394,7 @@ mod tests {
#[tokio::test]
async fn optional_cookie_user_returns_none_with_invalid_token() {
let app = test_router_optional(rejecting_state());
let app = test_router_optional(rejecting_state().await);
let response = app
.oneshot(
Request::builder()
@@ -380,7 +412,7 @@ mod tests {
#[tokio::test]
async fn required_cookie_user_redirects_without_cookie() {
let app = test_router_required(panic_state());
let app = test_router_required(panic_state().await);
let response = app
.oneshot(Request::builder().uri("/required").body(Body::empty()).unwrap())
.await
@@ -391,7 +423,7 @@ mod tests {
#[tokio::test]
async fn required_cookie_user_redirects_with_invalid_token() {
let app = test_router_required(rejecting_state());
let app = test_router_required(rejecting_state().await);
let response = app
.oneshot(
Request::builder()

View File

@@ -13,13 +13,13 @@ pub mod html {
use application::{
commands::{DeleteReviewCommand, LoginCommand, RegisterCommand},
ports::{HtmlPageContext, LoginPageData, NewReviewPageData, RegisterPageData},
ports::{FollowingPageData, HtmlPageContext, LoginPageData, NewReviewPageData, RegisterPageData, RemoteActorView},
use_cases::{delete_review, log_review, login as login_uc, register as register_uc},
};
use domain::{errors::DomainError, value_objects::UserId};
use crate::{
dtos::{DiaryQueryParams, ErrorQuery, LoginForm, LogReviewData, LogReviewForm, RegisterForm},
dtos::{DiaryQueryParams, ErrorQuery, FollowForm, LoginForm, LogReviewData, LogReviewForm, RegisterForm, UnfollowForm},
extractors::{OptionalCookieUser, RequiredCookieUser},
state::AppState,
};
@@ -49,10 +49,8 @@ pub mod html {
}
fn encode_error(msg: &str) -> String {
msg.replace(' ', "+")
.replace('&', "%26")
.replace('=', "%3D")
.replace('"', "%22")
use percent_encoding::{NON_ALPHANUMERIC, utf8_percent_encode};
utf8_percent_encode(msg, NON_ALPHANUMERIC).to_string()
}
fn secure_flag() -> &'static str {
@@ -298,7 +296,7 @@ pub mod html {
Path(profile_user_uuid): Path<Uuid>,
Query(params): Query<crate::dtos::ProfileQueryParams>,
) -> impl IntoResponse {
let mut ctx = build_page_context(&state, user_id).await;
let mut ctx = build_page_context(&state, user_id.clone()).await;
let view = params.view.unwrap_or_else(|| "recent".to_string());
let profile_user = match state.app_ctx.user_repository
@@ -315,6 +313,21 @@ pub mod html {
ctx.page_title = format!("{}'s Diary — Movies Diary", display_name);
ctx.canonical_url = format!("{}/users/{}", state.app_ctx.config.base_url, profile_user_uuid);
let is_own_profile = user_id.as_ref()
.map(|u| u.value() == profile_user_uuid)
.unwrap_or(false);
let following_count = if is_own_profile {
if let Some(ref uid) = user_id {
state.ap_service.count_following(uid.clone()).await
.unwrap_or(0)
} else {
0
}
} else {
0
};
let query = application::queries::GetUserProfileQuery {
user_id: profile_user_uuid,
view: view.clone(),
@@ -343,6 +356,9 @@ pub mod html {
limit,
history: profile.history,
trends: profile.trends,
is_own_profile,
error: params.error,
following_count,
};
match state.html_renderer.render_profile_page(data) {
Ok(html) => Html(html).into_response(),
@@ -352,6 +368,80 @@ pub mod html {
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
pub async fn follow_remote_user(
RequiredCookieUser(user_id): RequiredCookieUser,
State(state): State<AppState>,
Path(profile_user_uuid): Path<Uuid>,
Form(form): Form<FollowForm>,
) -> impl IntoResponse {
if user_id.value() != profile_user_uuid {
return StatusCode::FORBIDDEN.into_response();
}
match state.ap_service.follow(user_id.clone(), &form.handle).await {
Ok(()) => Redirect::to(&format!("/users/{}", profile_user_uuid)).into_response(),
Err(e) => {
tracing::error!("follow error: {:?}", e);
let msg = encode_error(&e.to_string());
Redirect::to(&format!("/users/{}?error={}", profile_user_uuid, msg)).into_response()
}
}
}
pub async fn unfollow_remote_user(
RequiredCookieUser(user_id): RequiredCookieUser,
State(state): State<AppState>,
Path(profile_user_uuid): Path<Uuid>,
Form(form): Form<UnfollowForm>,
) -> impl IntoResponse {
if user_id.value() != profile_user_uuid {
return StatusCode::FORBIDDEN.into_response();
}
match state.ap_service.unfollow(user_id.clone(), &form.actor_url).await {
Ok(()) => Redirect::to(&format!("/users/{}/following-list", profile_user_uuid)).into_response(),
Err(e) => {
let msg = encode_error(&e.to_string());
Redirect::to(&format!("/users/{}/following-list?error={}", profile_user_uuid, msg)).into_response()
}
}
}
pub async fn get_following_page(
RequiredCookieUser(user_id): RequiredCookieUser,
State(state): State<AppState>,
Path(profile_user_uuid): Path<Uuid>,
Query(params): Query<crate::dtos::ErrorQuery>,
) -> impl IntoResponse {
if user_id.value() != profile_user_uuid {
return StatusCode::FORBIDDEN.into_response();
}
let mut ctx = build_page_context(&state, Some(user_id.clone())).await;
ctx.page_title = "Following — Movies Diary".to_string();
ctx.canonical_url = format!("{}/users/{}/following-list", state.app_ctx.config.base_url, profile_user_uuid);
match state.ap_service.get_following(user_id).await {
Ok(following) => {
let actors = following.into_iter().map(|a| RemoteActorView {
handle: a.handle,
display_name: a.display_name,
url: a.url,
}).collect();
let data = FollowingPageData {
ctx,
user_id: profile_user_uuid,
actors,
error: params.error,
};
match state.html_renderer.render_following_page(data) {
Ok(html) => Html(html).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e).into_response(),
}
}
Err(e) => {
tracing::error!("get_following error: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to load following list").into_response()
}
}
}
}
pub mod posters {

View File

@@ -15,7 +15,8 @@ use auth::{AuthConfig, Argon2PasswordHasher, JwtAuthService};
use metadata::MetadataClientImpl;
use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher};
use poster_storage::{PosterStorageAdapter, StorageConfig};
use sqlite::{SqliteMovieRepository, SqliteUserRepository};
use activitypub::ActivityPubService;
use sqlite::{SqliteFederationRepository, SqliteMovieRepository, SqliteUserRepository};
use rss::RssAdapter;
use template_askama::AskamaHtmlRenderer;
@@ -70,7 +71,7 @@ async fn wire_dependencies() -> anyhow::Result<AppState> {
PosterFetcherClient, PosterStorage, UserRepository,
};
let repository: Arc<dyn MovieRepository> = Arc::new(movie_repo);
let user_repository: Arc<dyn UserRepository> = Arc::new(SqliteUserRepository::new(pool));
let user_repository: Arc<dyn UserRepository> = Arc::new(SqliteUserRepository::new(pool.clone()));
let metadata_client: Arc<dyn MetadataClient> = Arc::new(MetadataClientImpl::new_omdb(omdb_api_key));
let poster_fetcher: Arc<dyn PosterFetcherClient> = Arc::new(ReqwestPosterFetcher::new(PosterFetcherConfig::from_env())?);
let poster_storage: Arc<dyn PosterStorage> = Arc::new(PosterStorageAdapter::from_config(storage_config)?);
@@ -91,10 +92,23 @@ async fn wire_dependencies() -> anyhow::Result<AppState> {
config: app_config.clone(),
};
// Federation
let federation_repo = Arc::new(SqliteFederationRepository::new(pool));
let ap_service = Arc::new(
ActivityPubService::new(
federation_repo,
Arc::clone(&user_repository),
app_config.base_url.clone(),
cfg!(debug_assertions),
)
.await?,
);
let ap_event_handler = ap_service.event_handler();
let poster_handler = PosterSyncHandler::new(handler_ctx, 3);
let (event_publisher, event_worker) = create_event_channel(
EventPublisherConfig::from_env(),
vec![Box::new(poster_handler)],
vec![Box::new(poster_handler), Box::new(ap_event_handler)],
);
tokio::spawn(event_worker.run());
@@ -116,6 +130,7 @@ async fn wire_dependencies() -> anyhow::Result<AppState> {
rss_renderer: Arc::new(RssAdapter::new(
std::env::var("BASE_URL").unwrap_or_else(|_| "http://localhost:3000".into()),
)),
ap_service,
})
}

View File

@@ -9,6 +9,21 @@ use tower_http::{services::ServeDir, trace::TraceLayer};
use crate::{handlers, state::AppState};
/// Build an ActivityPub router from the service, excluding routes that
/// conflict with HTML routes (/users/{id} and /users/{id}/following).
/// Those AP endpoints are still served via the federation middleware layer
/// applied to the whole AP router scope; the conflicting paths will need
/// content-negotiation wrappers added in Phase 5.
fn ap_routes(state: &AppState) -> Router {
let config = state.ap_service.federation_config();
Router::new()
.route("/.well-known/webfinger", routing::get(activitypub::webfinger::webfinger_handler))
.route("/users/{user_id}/inbox", routing::post(activitypub::inbox::inbox_handler))
.route("/users/{user_id}/outbox", routing::get(activitypub::outbox::outbox_handler))
.route("/users/{user_id}/followers", routing::get(activitypub::followers_handler::followers_handler))
.layer(config.middleware())
}
/// Simple global rate limiter: tracks request count per 60-second window.
/// Not per-IP — suitable for a low-traffic personal app.
#[derive(Clone)]
@@ -47,12 +62,14 @@ impl RateLimiter {
pub fn build_router(state: AppState) -> Router {
let rate_limit = state.app_ctx.config.rate_limit;
let ap_router = ap_routes(&state);
Router::new()
.merge(html_routes(rate_limit))
.merge(api_routes(rate_limit))
.nest_service("/static", ServeDir::new("static"))
.layer(TraceLayer::new_for_http())
.with_state(state)
.merge(ap_router)
}
fn html_routes(rate_limit: u64) -> Router<AppState> {
@@ -87,6 +104,18 @@ fn html_routes(rate_limit: u64) -> Router<AppState> {
"/users/{id}",
routing::get(handlers::html::get_user_profile),
)
.route(
"/users/{id}/follow",
routing::post(handlers::html::follow_remote_user),
)
.route(
"/users/{id}/unfollow",
routing::post(handlers::html::unfollow_remote_user),
)
.route(
"/users/{id}/following-list",
routing::get(handlers::html::get_following_page),
)
.merge(auth)
.route(
"/reviews/new",

View File

@@ -1,5 +1,6 @@
use std::sync::Arc;
use activitypub::ActivityPubService;
use application::context::AppContext;
use crate::ports::{HtmlRenderer, RssFeedRenderer};
@@ -9,4 +10,5 @@ pub struct AppState {
pub app_ctx: AppContext,
pub html_renderer: Arc<dyn HtmlRenderer>,
pub rss_renderer: Arc<dyn RssFeedRenderer>,
pub ap_service: Arc<ActivityPubService>,
}

View File

@@ -88,6 +88,32 @@ impl UserRepository for NobodyUserRepo {
async fn list_with_stats(&self) -> Result<Vec<domain::models::UserSummary>, DomainError> { panic!() }
}
async fn test_ap_service() -> Arc<activitypub::ActivityPubService> {
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_keypairs (user_id TEXT PRIMARY KEY, public_key TEXT NOT NULL, private_key TEXT NOT NULL)")
.execute(&pool).await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_remote_actors (url TEXT PRIMARY KEY, handle TEXT NOT NULL, inbox_url TEXT NOT NULL, shared_inbox_url TEXT, display_name TEXT)")
.execute(&pool).await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_followers (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', PRIMARY KEY (local_user_id, remote_actor_url))")
.execute(&pool).await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_following (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, PRIMARY KEY (local_user_id, remote_actor_url))")
.execute(&pool).await.unwrap();
let fed_repo = Arc::new(sqlite::SqliteFederationRepository::new(pool));
struct DummyUserRepo;
#[async_trait]
impl UserRepository for DummyUserRepo {
async fn find_by_email(&self, _: &Email) -> Result<Option<User>, DomainError> { Ok(None) }
async fn save(&self, _: &User) -> Result<(), DomainError> { Ok(()) }
async fn find_by_id(&self, _: &UserId) -> Result<Option<User>, DomainError> { Ok(None) }
async fn list_with_stats(&self) -> Result<Vec<domain::models::UserSummary>, DomainError> { Ok(vec![]) }
}
Arc::new(
activitypub::ActivityPubService::new(fed_repo, Arc::new(DummyUserRepo), "http://localhost:3000".to_string(), true)
.await
.unwrap(),
)
}
async fn test_app() -> Router {
let pool = SqlitePool::connect("sqlite::memory:")
.await
@@ -109,6 +135,7 @@ async fn test_app() -> Router {
},
html_renderer: Arc::new(AskamaHtmlRenderer::new()),
rss_renderer: Arc::new(RssAdapter::new("http://localhost:3000".into())),
ap_service: test_ap_service().await,
};
routes::build_router(state)