feat: migrate k-ap 0.1.10→0.3.1, fix AP gaps

- split FederationRepository into FollowRepository, ActorRepository, BlocklistRepository, ActivityRepository
- RemoteActor: 5 new fields (bio, banner_url, followers_url, following_url, also_known_as)
- ApObjectHandler split: get_local_objects_page/count_local_posts → ApContentReader trait
- builder API: positional args → named setters
- broadcast_create_note/update_note: add ApVisibility + mentioned_inboxes params
- backfill_outbox → import_remote_outbox
- ApUser: also_known_as Option<String> → Vec<String>, new fields

AP gaps fixed:
- add GET /users/{id}/followers + /following with content negotiation
- wire EventPublisher into builder via FederationEventBridge adapter
- add display_name field full stack (domain→DB→API→AP)
- DB-side outbox pagination (get_local_reviews_page)
- set featured_url on ApUser
This commit is contained in:
2026-05-29 10:42:53 +02:00
parent bace54c552
commit 624cfe5799
32 changed files with 1016 additions and 957 deletions

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use k_ap::ApObjectHandler;
use k_ap::{ApContentReader, ApObjectHandler};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use url::Url;
@@ -13,16 +13,7 @@ pub struct CompositeObjectHandler {
}
#[async_trait]
impl ApObjectHandler for CompositeObjectHandler {
async fn get_local_objects_for_user(
&self,
user_id: uuid::Uuid,
) -> anyhow::Result<Vec<(Url, serde_json::Value)>> {
let mut results = self.review.get_local_objects_for_user(user_id).await?;
results.extend(self.watchlist.get_local_objects_for_user(user_id).await?);
Ok(results)
}
impl ApContentReader for CompositeObjectHandler {
async fn get_local_objects_page(
&self,
user_id: uuid::Uuid,
@@ -34,6 +25,13 @@ impl ApObjectHandler for CompositeObjectHandler {
.await
}
async fn count_local_posts(&self) -> anyhow::Result<u64> {
self.review.count_local_posts().await
}
}
#[async_trait]
impl ApObjectHandler for CompositeObjectHandler {
async fn on_create(
&self,
ap_id: &Url,
@@ -77,10 +75,6 @@ impl ApObjectHandler for CompositeObjectHandler {
Ok(())
}
async fn count_local_posts(&self) -> anyhow::Result<u64> {
self.review.count_local_posts().await
}
async fn on_like(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> {
Ok(())
}
@@ -89,6 +83,10 @@ impl ApObjectHandler for CompositeObjectHandler {
Ok(())
}
async fn on_announce_of_remote(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> {
Ok(())
}
async fn on_unlike(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> {
Ok(())
}

View File

@@ -8,7 +8,7 @@ use domain::{
};
use std::sync::Arc;
use k_ap::ActivityPubService;
use k_ap::{ActivityPubService, ApVisibility};
use crate::objects::review_to_ap_object;
use crate::urls::{actor_url, review_url};
@@ -170,7 +170,7 @@ impl ActivityPubEventHandler {
let json = serde_json::to_value(obj)?;
self.ap_service
.broadcast_update_note(user_id.value(), json)
.broadcast_update_note(user_id.value(), json, ApVisibility::Public, vec![])
.await?;
Ok(())
@@ -227,7 +227,7 @@ impl ActivityPubEventHandler {
let json = serde_json::to_value(obj)?;
self.ap_service
.broadcast_create_note(user_id.value(), json)
.broadcast_create_note(user_id.value(), json, ApVisibility::Public, vec![])
.await?;
Ok(())
}

View File

@@ -0,0 +1,45 @@
use std::sync::Arc;
use domain::events::DomainEvent;
use domain::value_objects::UserId;
use k_ap::FederationEvent;
pub struct FederationEventBridge {
domain_publisher: Arc<dyn domain::ports::EventPublisher>,
}
impl FederationEventBridge {
pub fn new(domain_publisher: Arc<dyn domain::ports::EventPublisher>) -> Self {
Self { domain_publisher }
}
}
#[async_trait::async_trait]
impl k_ap::EventPublisher for FederationEventBridge {
async fn publish(&self, event: FederationEvent) -> anyhow::Result<()> {
match event {
FederationEvent::BackfillRequested {
owner_user_id,
follower_inbox_url,
} => {
tracing::info!(
owner = %owner_user_id,
inbox = %follower_inbox_url,
"federation BackfillRequested → FollowAccepted"
);
self.domain_publisher
.publish(&DomainEvent::FollowAccepted {
local_user_id: UserId::from_uuid(owner_user_id),
remote_actor_url: follower_inbox_url.clone(),
outbox_url: follower_inbox_url,
})
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))
}
_ => {
tracing::debug!("ignoring federation event: {:?}", event);
Ok(())
}
}
}
}

View File

@@ -1,5 +1,6 @@
pub mod composite_handler;
pub mod event_handler;
pub mod federation_event_bridge;
pub mod objects;
pub mod port;
pub mod remote_review_repository;
@@ -10,8 +11,9 @@ pub mod watchlist_handler;
// Re-export the generic base types that callers need
pub use k_ap::{
ActivityPubService, ApFederationConfig, ApObjectHandler, ApUser, ApUserRepository,
FederationData, FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor,
ActivityPubService, ActivityRepository, ActorRepository, ApContentReader, ApFederationConfig,
ApObjectHandler, ApUser, ApUserRepository, BlocklistRepository, FederationData,
FollowRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor,
};
pub use event_handler::ActivityPubEventHandler;
@@ -27,14 +29,17 @@ pub struct ActivityPubWire {
}
pub async fn wire(
federation_repo: std::sync::Arc<dyn FederationRepository>,
activity_repo: std::sync::Arc<dyn ActivityRepository>,
follow_repo: std::sync::Arc<dyn FollowRepository>,
actor_repo: std::sync::Arc<dyn ActorRepository>,
blocklist_repo: std::sync::Arc<dyn BlocklistRepository>,
review_store: std::sync::Arc<dyn RemoteReviewRepository>,
remote_watchlist_repo: std::sync::Arc<dyn domain::ports::RemoteWatchlistRepository>,
local_ap_content: std::sync::Arc<dyn domain::ports::LocalApContentQuery>,
user_repo: std::sync::Arc<dyn domain::ports::UserRepository>,
base_url: String,
allow_registration: bool,
_event_publisher: std::sync::Arc<dyn domain::ports::EventPublisher>,
event_publisher: std::sync::Arc<dyn domain::ports::EventPublisher>,
) -> anyhow::Result<ActivityPubWire> {
let review_handler = std::sync::Arc::new(ReviewObjectHandler {
content_query: std::sync::Arc::clone(&local_ap_content),
@@ -62,18 +67,28 @@ pub async fn wire(
);
}
let fed_event_bridge = std::sync::Arc::new(
federation_event_bridge::FederationEventBridge::new(event_publisher),
);
let concrete = std::sync::Arc::new(
ActivityPubService::builder(
federation_repo,
std::sync::Arc::new(DomainUserRepoAdapter::new(user_repo, base_url.clone())),
composite,
base_url.clone(),
)
.allow_registration(allow_registration)
.software_name("movies-diary")
.debug(federation_debug)
.build()
.await?,
ActivityPubService::builder(base_url.clone())
.activity_repo(activity_repo)
.follow_repo(follow_repo)
.actor_repo(actor_repo)
.blocklist_repo(blocklist_repo)
.user_repo(std::sync::Arc::new(DomainUserRepoAdapter::new(
user_repo,
base_url.clone(),
)))
.content_reader(composite.clone() as std::sync::Arc<dyn ApContentReader>)
.object_handler(composite as std::sync::Arc<dyn ApObjectHandler>)
.event_publisher(fed_event_bridge)
.allow_registration(allow_registration)
.software_name("movies-diary")
.debug(federation_debug)
.build()
.await?,
);
let router = concrete.router();

View File

@@ -31,7 +31,9 @@ pub trait ActivityPubPort: Send + Sync {
async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> anyhow::Result<()>;
async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()>;
async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>>;
async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()>;
async fn import_remote_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()>;
async fn followers_collection_json(&self, user_id: Uuid, page: Option<u32>) -> anyhow::Result<String>;
async fn following_collection_json(&self, user_id: Uuid, page: Option<u32>) -> anyhow::Result<String>;
}
#[async_trait]
@@ -98,8 +100,14 @@ impl ActivityPubPort for ActivityPubService {
async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>> {
self.get_blocked_domains().await
}
async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> {
self.backfill_outbox(outbox_url, actor_url).await
async fn import_remote_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> {
self.import_remote_outbox(outbox_url, actor_url).await
}
async fn followers_collection_json(&self, user_id: Uuid, page: Option<u32>) -> anyhow::Result<String> {
self.followers_collection_json(user_id, page).await
}
async fn following_collection_json(&self, user_id: Uuid, page: Option<u32>) -> anyhow::Result<String> {
self.following_collection_json(user_id, page).await
}
}
@@ -158,7 +166,13 @@ impl ActivityPubPort for NoopActivityPubService {
async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>> {
Ok(vec![])
}
async fn backfill_outbox(&self, _: &str, _: &str) -> anyhow::Result<()> {
async fn import_remote_outbox(&self, _: &str, _: &str) -> anyhow::Result<()> {
Ok(())
}
async fn followers_collection_json(&self, _: Uuid, _: Option<u32>) -> anyhow::Result<String> {
Ok(String::new())
}
async fn following_collection_json(&self, _: Uuid, _: Option<u32>) -> anyhow::Result<String> {
Ok(String::new())
}
}

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use k_ap::ApObjectHandler;
use k_ap::{ApContentReader, ApObjectHandler};
use async_trait::async_trait;
use domain::{
models::ReviewSource,
@@ -20,43 +20,7 @@ pub struct ReviewObjectHandler {
}
#[async_trait]
impl ApObjectHandler for ReviewObjectHandler {
async fn get_local_objects_for_user(
&self,
user_id: uuid::Uuid,
) -> anyhow::Result<Vec<(Url, serde_json::Value)>> {
let domain_user_id = UserId::from_uuid(user_id);
let entries = self
.content_query
.get_local_reviews_for_user(&domain_user_id)
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
let actor = actor_url(&self.base_url, user_id);
let mut results = Vec::new();
for entry in entries {
let review = entry.review();
let movie = entry.movie();
let ap_id = review_url(&self.base_url, review.id());
let poster_url = movie
.poster_path()
.map(|p| format!("{}/images/{}", self.base_url, p.value()));
let obj = review_to_ap_object(
review,
ap_id.clone(),
actor.clone(),
movie.title().value().to_string(),
movie.release_year().value(),
poster_url,
&self.base_url,
);
results.push((ap_id, serde_json::to_value(obj)?));
}
Ok(results)
}
impl ApContentReader for ReviewObjectHandler {
async fn get_local_objects_page(
&self,
user_id: uuid::Uuid,
@@ -64,9 +28,10 @@ impl ApObjectHandler for ReviewObjectHandler {
limit: usize,
) -> anyhow::Result<Vec<(url::Url, serde_json::Value, chrono::DateTime<chrono::Utc>)>> {
let domain_user_id = UserId::from_uuid(user_id);
let before_naive = before.map(|dt| dt.naive_utc());
let entries = self
.content_query
.get_local_reviews_for_user(&domain_user_id)
.get_local_reviews_page(&domain_user_id, before_naive, limit)
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
@@ -75,13 +40,6 @@ impl ApObjectHandler for ReviewObjectHandler {
for entry in entries {
let review = entry.review();
let published = chrono::DateTime::from_naive_utc_and_offset(*review.watched_at(), chrono::Utc);
if let Some(cutoff) = before
&& published >= cutoff
{
continue;
}
let movie = entry.movie();
let ap_id = review_url(&self.base_url, review.id());
let poster_url = movie
@@ -98,14 +56,20 @@ impl ApObjectHandler for ReviewObjectHandler {
&self.base_url,
);
results.push((ap_id, serde_json::to_value(obj)?, published));
if results.len() >= limit {
break;
}
}
Ok(results)
}
async fn count_local_posts(&self) -> anyhow::Result<u64> {
self.content_query
.count_local_posts()
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))
}
}
#[async_trait]
impl ApObjectHandler for ReviewObjectHandler {
async fn on_create(
&self,
_ap_id: &Url,
@@ -200,13 +164,6 @@ impl ApObjectHandler for ReviewObjectHandler {
self.review_store.delete_by_actor(actor_url.as_str()).await
}
async fn count_local_posts(&self) -> anyhow::Result<u64> {
self.content_query
.count_local_posts()
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))
}
async fn on_like(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> {
Ok(())
}
@@ -215,6 +172,10 @@ impl ApObjectHandler for ReviewObjectHandler {
Ok(())
}
async fn on_announce_of_remote(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> {
Ok(())
}
async fn on_unlike(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> {
Ok(())
}

View File

@@ -26,10 +26,14 @@ impl DomainUserRepoAdapter {
ApUser {
id: u.id().value(),
username: u.username().value().to_string(),
display_name: u.display_name().map(|s| s.to_string()),
bio: u.bio().map(|s| s.to_string()),
avatar_url,
banner_url,
also_known_as: u.also_known_as().map(|s| s.to_string()),
also_known_as: u
.also_known_as()
.map(|s| vec![s.to_string()])
.unwrap_or_default(),
profile_url,
attachment: u
.profile_fields()
@@ -39,6 +43,15 @@ impl DomainUserRepoAdapter {
value: f.value.clone(),
})
.collect(),
manually_approves_followers: true,
discoverable: true,
actor_type: Default::default(),
featured_url: Url::parse(&format!(
"{}/users/{}/featured",
self.base_url,
u.id().value()
))
.ok(),
}
}
}

View File

@@ -2,15 +2,13 @@ use std::sync::Arc;
use k_ap::ApObjectHandler;
use async_trait::async_trait;
use chrono::Utc;
use domain::{
models::RemoteWatchlistEntry,
ports::{LocalApContentQuery, RemoteWatchlistRepository},
value_objects::UserId,
};
use url::Url;
use crate::{objects::{WatchlistObject, watchlist_to_ap_object}, urls::{actor_url, watchlist_entry_url}};
use crate::objects::WatchlistObject;
pub struct WatchlistObjectHandler {
pub remote_watchlist_repo: Arc<dyn RemoteWatchlistRepository>,
@@ -20,55 +18,6 @@ pub struct WatchlistObjectHandler {
#[async_trait]
impl ApObjectHandler for WatchlistObjectHandler {
async fn get_local_objects_for_user(
&self,
user_id: uuid::Uuid,
) -> anyhow::Result<Vec<(Url, serde_json::Value)>> {
let domain_user_id = UserId::from_uuid(user_id);
let entries = self
.content_query
.get_local_watchlist_for_user(&domain_user_id)
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
let actor = actor_url(&self.base_url, user_id);
let mut results = Vec::new();
for wm in entries {
let movie_id = wm.entry.movie_id.value();
let ap_id = watchlist_entry_url(&self.base_url, user_id, movie_id);
let added_at = chrono::DateTime::from_naive_utc_and_offset(wm.entry.added_at, Utc);
let external_metadata_id = wm
.movie
.external_metadata_id()
.map(|id| id.value().to_string());
let poster_url = wm
.movie
.poster_path()
.map(|p| format!("{}/images/{}", self.base_url, p.value()));
let obj = watchlist_to_ap_object(
ap_id.clone(),
actor.clone(),
wm.movie.title().value().to_string(),
wm.movie.release_year().value(),
external_metadata_id,
poster_url,
added_at,
&self.base_url,
);
results.push((ap_id, serde_json::to_value(obj)?));
}
Ok(results)
}
async fn get_local_objects_page(
&self,
_user_id: uuid::Uuid,
_before: Option<chrono::DateTime<Utc>>,
_limit: usize,
) -> anyhow::Result<Vec<(Url, serde_json::Value, chrono::DateTime<Utc>)>> {
Ok(vec![])
}
async fn on_create(
&self,
ap_id: &Url,
@@ -115,10 +64,6 @@ impl ApObjectHandler for WatchlistObjectHandler {
Ok(())
}
async fn count_local_posts(&self) -> anyhow::Result<u64> {
Ok(0)
}
async fn on_like(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> {
Ok(())
}
@@ -127,6 +72,10 @@ impl ApObjectHandler for WatchlistObjectHandler {
Ok(())
}
async fn on_announce_of_remote(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> {
Ok(())
}
async fn on_unlike(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> {
Ok(())
}