fix: AP bugs — backfill mapping, review activity type, also_known_as parse

- BackfillRequested now maps to BackfillFollower domain event (not FollowAccepted);
  worker calls run_backfill_for_follower to send LOCAL content to new follower inbox,
  instead of incorrectly trying to import from an inbox URL as if it were an outbox
- reviews broadcast as Create activity instead of Add (semantically correct)
- also_known_as JSON parse failure logs warning + preserves raw string as single-element
  vec instead of silently returning empty
This commit is contained in:
2026-05-29 10:54:11 +02:00
parent 624cfe5799
commit 36d15e1344
9 changed files with 71 additions and 25 deletions

View File

@@ -123,7 +123,7 @@ impl ActivityPubEventHandler {
let json = serde_json::to_value(obj)?; let json = serde_json::to_value(obj)?;
self.ap_service self.ap_service
.broadcast_add_to_followers(user_id.value(), ap_id, json) .broadcast_create_note(user_id.value(), json, ApVisibility::Public, vec![])
.await?; .await?;
Ok(()) Ok(())

View File

@@ -22,16 +22,10 @@ impl k_ap::EventPublisher for FederationEventBridge {
owner_user_id, owner_user_id,
follower_inbox_url, follower_inbox_url,
} => { } => {
tracing::info!(
owner = %owner_user_id,
inbox = %follower_inbox_url,
"federation BackfillRequested → FollowAccepted"
);
self.domain_publisher self.domain_publisher
.publish(&DomainEvent::FollowAccepted { .publish(&DomainEvent::BackfillFollower {
local_user_id: UserId::from_uuid(owner_user_id), owner_user_id: UserId::from_uuid(owner_user_id),
remote_actor_url: follower_inbox_url.clone(), follower_inbox_url,
outbox_url: follower_inbox_url,
}) })
.await .await
.map_err(|e| anyhow::anyhow!(e.to_string())) .map_err(|e| anyhow::anyhow!(e.to_string()))

View File

@@ -34,6 +34,7 @@ pub trait ActivityPubPort: Send + Sync {
async fn import_remote_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()>; async fn import_remote_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()>;
async fn followers_collection_json(&self, user_id: Uuid, page: Option<u32>) -> anyhow::Result<String>; async fn followers_collection_json(&self, user_id: Uuid, page: Option<u32>) -> anyhow::Result<String>;
async fn following_collection_json(&self, user_id: Uuid, page: Option<u32>) -> anyhow::Result<String>; async fn following_collection_json(&self, user_id: Uuid, page: Option<u32>) -> anyhow::Result<String>;
async fn run_backfill_for_follower(&self, owner_user_id: Uuid, follower_inbox_url: String) -> anyhow::Result<()>;
} }
#[async_trait] #[async_trait]
@@ -109,6 +110,9 @@ impl ActivityPubPort for ActivityPubService {
async fn following_collection_json(&self, user_id: Uuid, page: Option<u32>) -> anyhow::Result<String> { async fn following_collection_json(&self, user_id: Uuid, page: Option<u32>) -> anyhow::Result<String> {
self.following_collection_json(user_id, page).await self.following_collection_json(user_id, page).await
} }
async fn run_backfill_for_follower(&self, owner_user_id: Uuid, follower_inbox_url: String) -> anyhow::Result<()> {
self.run_backfill_for_follower(owner_user_id, follower_inbox_url).await
}
} }
pub struct NoopActivityPubService; pub struct NoopActivityPubService;
@@ -175,4 +179,7 @@ impl ActivityPubPort for NoopActivityPubService {
async fn following_collection_json(&self, _: Uuid, _: Option<u32>) -> anyhow::Result<String> { async fn following_collection_json(&self, _: Uuid, _: Option<u32>) -> anyhow::Result<String> {
Ok(String::new()) Ok(String::new())
} }
async fn run_backfill_for_follower(&self, _: Uuid, _: String) -> anyhow::Result<()> {
Ok(())
}
} }

View File

@@ -63,6 +63,10 @@ pub enum EventPayload {
remote_actor_url: String, remote_actor_url: String,
outbox_url: String, outbox_url: String,
}, },
BackfillFollower {
owner_user_id: String,
follower_inbox_url: String,
},
} }
impl EventPayload { impl EventPayload {
@@ -79,6 +83,7 @@ impl EventPayload {
EventPayload::WatchlistEntryAdded { .. } => "WatchlistEntryAdded", EventPayload::WatchlistEntryAdded { .. } => "WatchlistEntryAdded",
EventPayload::WatchlistEntryRemoved { .. } => "WatchlistEntryRemoved", EventPayload::WatchlistEntryRemoved { .. } => "WatchlistEntryRemoved",
EventPayload::FollowAccepted { .. } => "FollowAccepted", EventPayload::FollowAccepted { .. } => "FollowAccepted",
EventPayload::BackfillFollower { .. } => "BackfillFollower",
} }
} }
} }
@@ -181,6 +186,13 @@ impl From<&DomainEvent> for EventPayload {
remote_actor_url: remote_actor_url.clone(), remote_actor_url: remote_actor_url.clone(),
outbox_url: outbox_url.clone(), outbox_url: outbox_url.clone(),
}, },
DomainEvent::BackfillFollower {
owner_user_id,
follower_inbox_url,
} => EventPayload::BackfillFollower {
owner_user_id: owner_user_id.value().to_string(),
follower_inbox_url: follower_inbox_url.clone(),
},
} }
} }
} }
@@ -281,6 +293,13 @@ impl TryFrom<EventPayload> for DomainEvent {
remote_actor_url, remote_actor_url,
outbox_url, outbox_url,
}), }),
EventPayload::BackfillFollower {
owner_user_id,
follower_inbox_url,
} => Ok(DomainEvent::BackfillFollower {
owner_user_id: UserId::from_uuid(parse_uuid(&owner_user_id, "owner_user_id")?),
follower_inbox_url,
}),
} }
} }
} }

View File

@@ -13,6 +13,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String {
DomainEvent::WatchlistEntryAdded { .. } => "watchlist.entry.added", DomainEvent::WatchlistEntryAdded { .. } => "watchlist.entry.added",
DomainEvent::WatchlistEntryRemoved { .. } => "watchlist.entry.removed", DomainEvent::WatchlistEntryRemoved { .. } => "watchlist.entry.removed",
DomainEvent::FollowAccepted { .. } => "follow.accepted", DomainEvent::FollowAccepted { .. } => "follow.accepted",
DomainEvent::BackfillFollower { .. } => "backfill.follower",
}; };
format!("{prefix}.{suffix}") format!("{prefix}.{suffix}")
} }

View File

@@ -58,7 +58,12 @@ fn pg_remote_actor(row: &sqlx::postgres::PgRow, url_col: &str) -> RemoteActor {
.try_get::<Option<String>, _>("also_known_as") .try_get::<Option<String>, _>("also_known_as")
.ok() .ok()
.flatten() .flatten()
.map(|s| serde_json::from_str(&s).unwrap_or_default()) .map(|s| {
serde_json::from_str::<Vec<String>>(&s).unwrap_or_else(|e| {
tracing::warn!(raw = %s, error = %e, "failed to parse also_known_as JSON");
vec![s]
})
})
.unwrap_or_default(), .unwrap_or_default(),
} }
} }

View File

@@ -58,7 +58,12 @@ fn remote_actor_from_row(row: &sqlx::sqlite::SqliteRow, url_col: &str) -> Remote
.try_get::<Option<String>, _>("also_known_as") .try_get::<Option<String>, _>("also_known_as")
.ok() .ok()
.flatten() .flatten()
.map(|s| serde_json::from_str(&s).unwrap_or_default()) .map(|s| {
serde_json::from_str::<Vec<String>>(&s).unwrap_or_else(|e| {
tracing::warn!(raw = %s, error = %e, "failed to parse also_known_as JSON");
vec![s]
})
})
.unwrap_or_default(), .unwrap_or_default(),
} }
} }

View File

@@ -61,6 +61,10 @@ pub enum DomainEvent {
remote_actor_url: String, remote_actor_url: String,
outbox_url: String, outbox_url: String,
}, },
BackfillFollower {
owner_user_id: UserId,
follower_inbox_url: String,
},
} }
#[async_trait] #[async_trait]

View File

@@ -10,18 +10,29 @@ pub struct FollowBackfillHandler {
#[async_trait] #[async_trait]
impl EventHandler for FollowBackfillHandler { impl EventHandler for FollowBackfillHandler {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
let DomainEvent::FollowAccepted { match event {
remote_actor_url, DomainEvent::FollowAccepted {
outbox_url, remote_actor_url,
.. outbox_url,
} = event ..
else { } => {
return Ok(()); tracing::info!(actor = %remote_actor_url, outbox = %outbox_url, "importing remote outbox");
}; self.ap_service
tracing::info!(actor = %remote_actor_url, outbox = %outbox_url, "starting outbox backfill"); .import_remote_outbox(outbox_url, remote_actor_url)
self.ap_service .await
.import_remote_outbox(outbox_url, remote_actor_url) .map_err(|e| DomainError::InfrastructureError(e.to_string()))
.await }
.map_err(|e| DomainError::InfrastructureError(e.to_string())) DomainEvent::BackfillFollower {
owner_user_id,
follower_inbox_url,
} => {
tracing::info!(owner = %owner_user_id.value(), inbox = %follower_inbox_url, "backfilling local content to new follower");
self.ap_service
.run_backfill_for_follower(owner_user_id.value(), follower_inbox_url.clone())
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
}
_ => Ok(()),
}
} }
} }