feat: actor cache TTL with staleness-aware re-fetch
Adds fetched_at to RemoteActor, configurable TTL via builder (.actor_cache_ttl_secs, default 24h), and get_or_refresh_remote_actor helper that re-fetches stale actors from origin. Closes #3
This commit is contained in:
@@ -375,6 +375,7 @@ impl Object for DbActor {
|
||||
followers_url: json.followers.as_ref().map(|u| u.to_string()),
|
||||
following_url: json.following.as_ref().map(|u| u.to_string()),
|
||||
also_known_as: json.also_known_as.clone(),
|
||||
fetched_at: Some(Utc::now()),
|
||||
};
|
||||
data.actor_repo.upsert_remote_actor(actor).await?;
|
||||
|
||||
|
||||
@@ -63,6 +63,7 @@ pub struct FederationData {
|
||||
pub(crate) allow_registration: bool,
|
||||
pub(crate) software_name: String,
|
||||
pub(crate) event_publisher: Option<Arc<dyn EventPublisher>>,
|
||||
pub(crate) actor_cache_ttl: std::time::Duration,
|
||||
}
|
||||
|
||||
impl FederationData {
|
||||
@@ -79,6 +80,7 @@ impl FederationData {
|
||||
allow_registration: bool,
|
||||
software_name: String,
|
||||
event_publisher: Option<Arc<dyn EventPublisher>>,
|
||||
actor_cache_ttl: std::time::Duration,
|
||||
) -> Self {
|
||||
let domain = base_url
|
||||
.trim_start_matches("https://")
|
||||
@@ -100,6 +102,7 @@ impl FederationData {
|
||||
allow_registration,
|
||||
software_name,
|
||||
event_publisher,
|
||||
actor_cache_ttl,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,8 @@ pub use actor::ActorRepository;
|
||||
pub use blocklist::BlocklistRepository;
|
||||
pub use follow::FollowRepository;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum FollowerStatus {
|
||||
Pending,
|
||||
@@ -35,6 +37,10 @@ pub struct RemoteActor {
|
||||
pub followers_url: Option<String>,
|
||||
pub following_url: Option<String>,
|
||||
pub also_known_as: Vec<String>,
|
||||
/// When this actor was last fetched from the origin instance.
|
||||
/// `None` means unknown — treated as always-fresh to avoid
|
||||
/// breaking existing consumers that don't populate this field.
|
||||
pub fetched_at: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
use activitypub_federation::fetch::object_id::ObjectId;
|
||||
use url::Url;
|
||||
|
||||
use crate::actors::DbActor;
|
||||
use crate::repository::RemoteActor;
|
||||
|
||||
use super::ActivityPubService;
|
||||
|
||||
impl ActivityPubService {
|
||||
@@ -17,4 +21,42 @@ impl ActivityPubService {
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
Ok(res.object)
|
||||
}
|
||||
|
||||
/// Get a cached remote actor, re-fetching from origin if stale.
|
||||
///
|
||||
/// Returns `None` if the actor has never been seen. Staleness is
|
||||
/// determined by `actor_cache_ttl_secs` (builder config).
|
||||
pub async fn get_or_refresh_remote_actor(
|
||||
&self,
|
||||
actor_url: &str,
|
||||
) -> anyhow::Result<Option<RemoteActor>> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let cached = data.actor_repo.get_remote_actor(actor_url).await?;
|
||||
if let Some(ref actor) = cached {
|
||||
let is_fresh = actor
|
||||
.fetched_at
|
||||
.map(|t| {
|
||||
let age = chrono::Utc::now().signed_duration_since(t);
|
||||
age < chrono::Duration::from_std(data.actor_cache_ttl).unwrap_or_default()
|
||||
})
|
||||
.unwrap_or(true);
|
||||
if is_fresh {
|
||||
return Ok(cached);
|
||||
}
|
||||
}
|
||||
let url = match Url::parse(actor_url) {
|
||||
Ok(u) => u,
|
||||
Err(_) => return Ok(cached),
|
||||
};
|
||||
match ObjectId::<DbActor>::from(url)
|
||||
.dereference_forced(&data)
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(data.actor_repo.get_remote_actor(actor_url).await?),
|
||||
Err(e) => {
|
||||
tracing::warn!(actor_url, error = %e, "re-fetch failed, using stale cache");
|
||||
Ok(cached)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,6 +48,7 @@ impl ActivityPubService {
|
||||
followers_url: Some(remote_actor.followers_url.to_string()),
|
||||
following_url: Some(remote_actor.following_url.to_string()),
|
||||
also_known_as: remote_actor.also_known_as.clone(),
|
||||
fetched_at: Some(chrono::Utc::now()),
|
||||
};
|
||||
// Save BEFORE delivering — prevents lost state on process restart.
|
||||
data.follow_repo
|
||||
@@ -356,6 +357,7 @@ impl ActivityPubService {
|
||||
followers_url: None,
|
||||
following_url: None,
|
||||
also_known_as: vec![],
|
||||
fetched_at: None,
|
||||
},
|
||||
};
|
||||
actors.push(actor);
|
||||
@@ -403,6 +405,7 @@ impl ActivityPubService {
|
||||
followers_url: Some(format!("{}/followers", target_actor_url)),
|
||||
following_url: Some(format!("{}/following", target_actor_url)),
|
||||
also_known_as: target.also_known_as,
|
||||
fetched_at: None,
|
||||
};
|
||||
data.follow_repo
|
||||
.add_following(local_user_id, target_as_remote, &follow_id)
|
||||
|
||||
@@ -34,6 +34,8 @@ pub const DELIVERY_INITIAL_DELAY_SECS: u64 = 1;
|
||||
pub const HTTP_FETCH_TIMEOUT_SECS: u64 = 30;
|
||||
/// Sleep between backfill send batches.
|
||||
pub const BATCH_FETCH_SLEEP_MS: u64 = 100;
|
||||
/// Default actor cache TTL in seconds (24 hours).
|
||||
pub const ACTOR_CACHE_TTL_SECS: u64 = 24 * 60 * 60;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ActivityPubService {
|
||||
@@ -59,6 +61,7 @@ pub struct ActivityPubServiceBuilder {
|
||||
delivery_max_attempts: u32,
|
||||
delivery_initial_delay_secs: u64,
|
||||
signed_fetch_actor_id: Option<uuid::Uuid>,
|
||||
actor_cache_ttl_secs: u64,
|
||||
}
|
||||
|
||||
impl ActivityPubServiceBuilder {
|
||||
@@ -115,6 +118,13 @@ impl ActivityPubServiceBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// How long cached remote actors are considered fresh (seconds, default 24h).
|
||||
/// After this duration, the next access re-fetches the actor from origin.
|
||||
pub fn actor_cache_ttl_secs(mut self, v: u64) -> Self {
|
||||
self.actor_cache_ttl_secs = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a local actor whose keypair signs all outgoing fetch requests
|
||||
/// (HTTP Signature on GETs). Required for federating with instances
|
||||
/// that enforce authorized-fetch / Secure Mode.
|
||||
@@ -157,6 +167,7 @@ impl ActivityPubServiceBuilder {
|
||||
self.allow_registration,
|
||||
self.software_name,
|
||||
self.event_publisher,
|
||||
std::time::Duration::from_secs(self.actor_cache_ttl_secs),
|
||||
);
|
||||
let signing_actor = if let Some(uid) = self.signed_fetch_actor_id {
|
||||
let actor = crate::actors::build_local_actor(
|
||||
@@ -199,6 +210,7 @@ impl ActivityPubService {
|
||||
delivery_max_attempts: DELIVERY_MAX_ATTEMPTS,
|
||||
delivery_initial_delay_secs: DELIVERY_INITIAL_DELAY_SECS,
|
||||
signed_fetch_actor_id: None,
|
||||
actor_cache_ttl_secs: ACTOR_CACHE_TTL_SECS,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -520,6 +520,7 @@ async fn setup(blocklist: MemBlocklistRepo, local_user_id: uuid::Uuid) -> TestSe
|
||||
false,
|
||||
"test".to_string(),
|
||||
None,
|
||||
std::time::Duration::from_secs(24 * 60 * 60),
|
||||
);
|
||||
|
||||
let config = FederationConfig::builder()
|
||||
|
||||
@@ -374,6 +374,7 @@ fn make_data(
|
||||
false,
|
||||
"test".to_string(),
|
||||
None,
|
||||
std::time::Duration::from_secs(24 * 60 * 60),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user