diff --git a/crates/adapters/activitypub/src/event_handler.rs b/crates/adapters/activitypub/src/event_handler.rs index 322184a..dc4d290 100644 --- a/crates/adapters/activitypub/src/event_handler.rs +++ b/crates/adapters/activitypub/src/event_handler.rs @@ -123,7 +123,7 @@ impl ActivityPubEventHandler { let json = serde_json::to_value(obj)?; self.ap_service - .broadcast_add_to_followers(user_id.value(), ap_id, json) + .broadcast_create_note(user_id.value(), json, ApVisibility::Public, vec![]) .await?; Ok(()) diff --git a/crates/adapters/activitypub/src/federation_event_bridge.rs b/crates/adapters/activitypub/src/federation_event_bridge.rs index 6a47f2a..2f9ca53 100644 --- a/crates/adapters/activitypub/src/federation_event_bridge.rs +++ b/crates/adapters/activitypub/src/federation_event_bridge.rs @@ -22,16 +22,10 @@ impl k_ap::EventPublisher for FederationEventBridge { owner_user_id, follower_inbox_url, } => { - tracing::info!( - owner = %owner_user_id, - inbox = %follower_inbox_url, - "federation BackfillRequested → FollowAccepted" - ); self.domain_publisher - .publish(&DomainEvent::FollowAccepted { - local_user_id: UserId::from_uuid(owner_user_id), - remote_actor_url: follower_inbox_url.clone(), - outbox_url: follower_inbox_url, + .publish(&DomainEvent::BackfillFollower { + owner_user_id: UserId::from_uuid(owner_user_id), + follower_inbox_url, }) .await .map_err(|e| anyhow::anyhow!(e.to_string())) diff --git a/crates/adapters/activitypub/src/port.rs b/crates/adapters/activitypub/src/port.rs index bcbca40..1271e94 100644 --- a/crates/adapters/activitypub/src/port.rs +++ b/crates/adapters/activitypub/src/port.rs @@ -34,6 +34,7 @@ pub trait ActivityPubPort: Send + Sync { async fn import_remote_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()>; async fn followers_collection_json(&self, user_id: Uuid, page: Option) -> anyhow::Result; async fn following_collection_json(&self, user_id: Uuid, page: Option) -> anyhow::Result; + async fn run_backfill_for_follower(&self, owner_user_id: Uuid, follower_inbox_url: String) -> anyhow::Result<()>; } #[async_trait] @@ -109,6 +110,9 @@ impl ActivityPubPort for ActivityPubService { async fn following_collection_json(&self, user_id: Uuid, page: Option) -> anyhow::Result { self.following_collection_json(user_id, page).await } + async fn run_backfill_for_follower(&self, owner_user_id: Uuid, follower_inbox_url: String) -> anyhow::Result<()> { + self.run_backfill_for_follower(owner_user_id, follower_inbox_url).await + } } pub struct NoopActivityPubService; @@ -175,4 +179,7 @@ impl ActivityPubPort for NoopActivityPubService { async fn following_collection_json(&self, _: Uuid, _: Option) -> anyhow::Result { Ok(String::new()) } + async fn run_backfill_for_follower(&self, _: Uuid, _: String) -> anyhow::Result<()> { + Ok(()) + } } diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs index c0a351e..1372e0e 100644 --- a/crates/adapters/event-payload/src/lib.rs +++ b/crates/adapters/event-payload/src/lib.rs @@ -63,6 +63,10 @@ pub enum EventPayload { remote_actor_url: String, outbox_url: String, }, + BackfillFollower { + owner_user_id: String, + follower_inbox_url: String, + }, } impl EventPayload { @@ -79,6 +83,7 @@ impl EventPayload { EventPayload::WatchlistEntryAdded { .. } => "WatchlistEntryAdded", EventPayload::WatchlistEntryRemoved { .. } => "WatchlistEntryRemoved", EventPayload::FollowAccepted { .. } => "FollowAccepted", + EventPayload::BackfillFollower { .. } => "BackfillFollower", } } } @@ -181,6 +186,13 @@ impl From<&DomainEvent> for EventPayload { remote_actor_url: remote_actor_url.clone(), outbox_url: outbox_url.clone(), }, + DomainEvent::BackfillFollower { + owner_user_id, + follower_inbox_url, + } => EventPayload::BackfillFollower { + owner_user_id: owner_user_id.value().to_string(), + follower_inbox_url: follower_inbox_url.clone(), + }, } } } @@ -281,6 +293,13 @@ impl TryFrom for DomainEvent { remote_actor_url, outbox_url, }), + EventPayload::BackfillFollower { + owner_user_id, + follower_inbox_url, + } => Ok(DomainEvent::BackfillFollower { + owner_user_id: UserId::from_uuid(parse_uuid(&owner_user_id, "owner_user_id")?), + follower_inbox_url, + }), } } } diff --git a/crates/adapters/nats/src/subject.rs b/crates/adapters/nats/src/subject.rs index e3691e0..bba796b 100644 --- a/crates/adapters/nats/src/subject.rs +++ b/crates/adapters/nats/src/subject.rs @@ -13,6 +13,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String { DomainEvent::WatchlistEntryAdded { .. } => "watchlist.entry.added", DomainEvent::WatchlistEntryRemoved { .. } => "watchlist.entry.removed", DomainEvent::FollowAccepted { .. } => "follow.accepted", + DomainEvent::BackfillFollower { .. } => "backfill.follower", }; format!("{prefix}.{suffix}") } diff --git a/crates/adapters/postgres-federation/src/lib.rs b/crates/adapters/postgres-federation/src/lib.rs index 085d6a1..2bc4a3d 100644 --- a/crates/adapters/postgres-federation/src/lib.rs +++ b/crates/adapters/postgres-federation/src/lib.rs @@ -58,7 +58,12 @@ fn pg_remote_actor(row: &sqlx::postgres::PgRow, url_col: &str) -> RemoteActor { .try_get::, _>("also_known_as") .ok() .flatten() - .map(|s| serde_json::from_str(&s).unwrap_or_default()) + .map(|s| { + serde_json::from_str::>(&s).unwrap_or_else(|e| { + tracing::warn!(raw = %s, error = %e, "failed to parse also_known_as JSON"); + vec![s] + }) + }) .unwrap_or_default(), } } diff --git a/crates/adapters/sqlite-federation/src/lib.rs b/crates/adapters/sqlite-federation/src/lib.rs index dd00272..77717ce 100644 --- a/crates/adapters/sqlite-federation/src/lib.rs +++ b/crates/adapters/sqlite-federation/src/lib.rs @@ -58,7 +58,12 @@ fn remote_actor_from_row(row: &sqlx::sqlite::SqliteRow, url_col: &str) -> Remote .try_get::, _>("also_known_as") .ok() .flatten() - .map(|s| serde_json::from_str(&s).unwrap_or_default()) + .map(|s| { + serde_json::from_str::>(&s).unwrap_or_else(|e| { + tracing::warn!(raw = %s, error = %e, "failed to parse also_known_as JSON"); + vec![s] + }) + }) .unwrap_or_default(), } } diff --git a/crates/domain/src/events.rs b/crates/domain/src/events.rs index 8250efa..f1dad6b 100644 --- a/crates/domain/src/events.rs +++ b/crates/domain/src/events.rs @@ -61,6 +61,10 @@ pub enum DomainEvent { remote_actor_url: String, outbox_url: String, }, + BackfillFollower { + owner_user_id: UserId, + follower_inbox_url: String, + }, } #[async_trait] diff --git a/crates/worker/src/follow_backfill_handler.rs b/crates/worker/src/follow_backfill_handler.rs index 3886da3..386377f 100644 --- a/crates/worker/src/follow_backfill_handler.rs +++ b/crates/worker/src/follow_backfill_handler.rs @@ -10,18 +10,29 @@ pub struct FollowBackfillHandler { #[async_trait] impl EventHandler for FollowBackfillHandler { async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { - let DomainEvent::FollowAccepted { - remote_actor_url, - outbox_url, - .. - } = event - else { - return Ok(()); - }; - tracing::info!(actor = %remote_actor_url, outbox = %outbox_url, "starting outbox backfill"); - self.ap_service - .import_remote_outbox(outbox_url, remote_actor_url) - .await - .map_err(|e| DomainError::InfrastructureError(e.to_string())) + match event { + DomainEvent::FollowAccepted { + remote_actor_url, + outbox_url, + .. + } => { + tracing::info!(actor = %remote_actor_url, outbox = %outbox_url, "importing remote outbox"); + self.ap_service + .import_remote_outbox(outbox_url, remote_actor_url) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } + DomainEvent::BackfillFollower { + owner_user_id, + follower_inbox_url, + } => { + tracing::info!(owner = %owner_user_id.value(), inbox = %follower_inbox_url, "backfilling local content to new follower"); + self.ap_service + .run_backfill_for_follower(owner_user_id.value(), follower_inbox_url.clone()) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } + _ => Ok(()), + } } }