diff --git a/src/actors.rs b/src/actors.rs index 4cf8fa7..7cd4996 100644 --- a/src/actors.rs +++ b/src/actors.rs @@ -39,6 +39,7 @@ pub struct DbActor { pub attachment: Vec, pub manually_approves_followers: bool, pub actor_type: ApActorType, + pub featured_url: Option, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -100,6 +101,8 @@ pub struct Person { also_known_as: Vec, #[serde(skip_serializing_if = "Vec::is_empty", default)] attachment: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + featured: Option, } struct ActorUrls { @@ -189,6 +192,7 @@ pub async fn get_local_actor( attachment: user.attachment, manually_approves_followers: user.manually_approves_followers, actor_type: user.actor_type, + featured_url: user.featured_url, }) } @@ -264,6 +268,7 @@ impl Object for DbActor { attachment: user.attachment, manually_approves_followers: user.manually_approves_followers, actor_type: user.actor_type, + featured_url: user.featured_url, })) } @@ -316,6 +321,7 @@ impl Object for DbActor { image, also_known_as, attachment, + featured: self.featured_url, }) } @@ -399,6 +405,7 @@ impl Object for DbActor { .collect(), manually_approves_followers: json.manually_approves_followers, actor_type: json.kind, + featured_url: json.featured, }) } } diff --git a/src/service/backfill.rs b/src/service/backfill.rs index 2554145..73539d8 100644 --- a/src/service/backfill.rs +++ b/src/service/backfill.rs @@ -78,12 +78,27 @@ impl ActivityPubService { let data = config.to_request_data(); let local_actor = get_local_actor(owner_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let inbox = Url::parse(&follower_inbox_url)?; - let mut objects = data.object_handler.get_local_objects_for_user(owner_user_id).await?; - objects.reverse(); - let total = objects.len(); - let (mut success_count, mut failure_count) = (0usize, 0usize); - for chunk in objects.chunks(BATCH_SIZE) { - for (ap_id, object_json) in chunk { + + // Cursor-based pagination via get_local_objects_page (newest-first). + // Avoids loading the entire post history into memory at once. + let mut before: Option> = None; + let (mut success_count, mut failure_count, mut total) = (0usize, 0usize, 0usize); + + loop { + let page = data + .object_handler + .get_local_objects_page(owner_user_id, before, BATCH_SIZE) + .await?; + + if page.is_empty() { + break; + } + + let is_last_page = page.len() < BATCH_SIZE; + // Advance cursor to the oldest timestamp in this page. + before = page.last().map(|(_, _, ts)| *ts); + + for (ap_id, object_json, _ts) in &page { let create_id = Url::parse(&format!( "{}/activities/create/{}", base_url, @@ -94,16 +109,35 @@ impl ActivityPubService { actor: ObjectId::from(local_actor.ap_id.clone()), object: object_json.clone(), to: vec![], cc: vec![], bto: vec![], bcc: vec![], }; - let sends = SendActivityTask::prepare(&WithContext::new_default(create), &local_actor, vec![inbox.clone()], &data).await?; + let sends = SendActivityTask::prepare( + &WithContext::new_default(create), + &local_actor, + vec![inbox.clone()], + &data, + ).await?; + total += 1; if send_with_retry(sends, &data, max_attempts, initial_delay).await.is_empty() { success_count += 1; } else { failure_count += 1; } } + + if is_last_page { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(super::BATCH_FETCH_SLEEP_MS)).await; } - tracing::info!(user_id = %owner_user_id, follower = %follower_inbox_url, sent = success_count, failed = failure_count, total = total, "backfill complete"); + + tracing::info!( + user_id = %owner_user_id, + follower = %follower_inbox_url, + sent = success_count, + failed = failure_count, + total = total, + "backfill complete" + ); Ok(()) } } diff --git a/src/service/broadcast.rs b/src/service/broadcast.rs index a8b36c4..b97b855 100644 --- a/src/service/broadcast.rs +++ b/src/service/broadcast.rs @@ -119,7 +119,7 @@ impl ActivityPubService { id: activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), - object: serde_json::json!(ap_id.to_string()), + object: serde_json::json!({"type": "Tombstone", "id": ap_id.to_string()}), to: vec![crate::urls::AS_PUBLIC.to_string()], cc: vec![local_actor.followers_url.to_string()], }; diff --git a/src/tests/actors.rs b/src/tests/actors.rs index b19a8f2..7b95828 100644 --- a/src/tests/actors.rs +++ b/src/tests/actors.rs @@ -34,6 +34,7 @@ fn person_serializes_with_enriched_fields() { image: None, also_known_as: vec![], attachment: vec![], + featured: Some("https://example.com/users/1/featured".parse().unwrap()), }; let json = serde_json::to_value(&person).unwrap(); assert_eq!(json["discoverable"], true); diff --git a/src/user.rs b/src/user.rs index 69fe07b..a3cd2f5 100644 --- a/src/user.rs +++ b/src/user.rs @@ -58,6 +58,9 @@ pub struct ApUser { pub manually_approves_followers: bool, /// AP actor type serialized in the actor JSON. Defaults to `Person`. pub actor_type: ApActorType, + /// URL of the `featured` (pinned posts) collection. Set to expose a pinned + /// posts collection in the actor JSON, compatible with Mastodon/Pleroma. + pub featured_url: Option, } #[async_trait]