federation improvements
This commit is contained in:
@@ -61,6 +61,7 @@ impl Activity for FollowActivity {
|
||||
local_actor.user_id.clone(),
|
||||
self.actor.inner().as_str(),
|
||||
FollowerStatus::Pending,
|
||||
self.id.as_str(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -292,7 +293,7 @@ pub struct UpdateActivity {
|
||||
#[serde(rename = "type", default)]
|
||||
pub(crate) kind: UpdateType,
|
||||
pub(crate) actor: ObjectId<DbActor>,
|
||||
pub(crate) object: ReviewObject,
|
||||
pub(crate) object: serde_json::Value,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -309,19 +310,26 @@ impl Activity for UpdateActivity {
|
||||
}
|
||||
|
||||
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
if self.object.attributed_to.inner() != self.actor.inner() {
|
||||
return Err(Error::bad_request(anyhow::anyhow!(
|
||||
"update actor does not match object attributed_to"
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
let ap_id = self.object.id.inner().as_str();
|
||||
let rating = self.object.rating.min(5);
|
||||
let comment = self.object.comment.as_deref();
|
||||
let watched_at = self.object.watched_at.naive_utc();
|
||||
let object: ReviewObject = match serde_json::from_value(self.object) {
|
||||
Ok(o) => o,
|
||||
Err(_) => {
|
||||
tracing::debug!(actor = %self.actor.inner(), "ignoring non-review Update activity");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
if object.attributed_to.inner() != self.actor.inner() {
|
||||
return Err(Error::bad_request(anyhow::anyhow!(
|
||||
"update actor does not match object attributed_to"
|
||||
)));
|
||||
}
|
||||
let ap_id = object.id.inner().as_str();
|
||||
let rating = object.rating.min(5);
|
||||
let comment = object.comment.as_deref();
|
||||
let watched_at = object.watched_at.naive_utc();
|
||||
data.federation_repo
|
||||
.update_remote_review(ap_id, self.actor.inner().as_str(), rating, comment, watched_at)
|
||||
.await?;
|
||||
|
||||
@@ -26,6 +26,7 @@ impl ApFederationConfig {
|
||||
.domain(&data.domain)
|
||||
.app_data(data)
|
||||
.debug(true)
|
||||
.http_signature_compat(true)
|
||||
.url_verifier(Box::new(PermissiveVerifier))
|
||||
.build()
|
||||
.await?
|
||||
@@ -34,6 +35,7 @@ impl ApFederationConfig {
|
||||
.domain(&data.domain)
|
||||
.app_data(data)
|
||||
.debug(false)
|
||||
.http_signature_compat(true)
|
||||
.build()
|
||||
.await?
|
||||
};
|
||||
|
||||
@@ -34,7 +34,8 @@ pub struct Follower {
|
||||
|
||||
#[async_trait]
|
||||
pub trait FederationRepository: Send + Sync {
|
||||
async fn add_follower(&self, local_user_id: UserId, remote_actor_url: &str, status: FollowerStatus) -> Result<()>;
|
||||
async fn add_follower(&self, local_user_id: UserId, remote_actor_url: &str, status: FollowerStatus, follow_activity_id: &str) -> Result<()>;
|
||||
async fn get_follower_follow_activity_id(&self, local_user_id: UserId, remote_actor_url: &str) -> Result<Option<String>>;
|
||||
async fn remove_follower(&self, local_user_id: UserId, remote_actor_url: &str) -> Result<()>;
|
||||
async fn get_followers(&self, local_user_id: UserId) -> Result<Vec<Follower>>;
|
||||
async fn update_follower_status(&self, local_user_id: UserId, remote_actor_url: &str, status: FollowerStatus) -> Result<()>;
|
||||
|
||||
@@ -244,7 +244,12 @@ impl ActivityPubService {
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("remote actor not found"))?;
|
||||
|
||||
let follow_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let follow_id_str = data
|
||||
.federation_repo
|
||||
.get_follower_follow_activity_id(local_user_id.clone(), remote_actor_url)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("follow activity id not found for {}", remote_actor_url))?;
|
||||
let follow_id = Url::parse(&follow_id_str)?;
|
||||
let follow = FollowActivity {
|
||||
id: follow_id,
|
||||
kind: Default::default(),
|
||||
@@ -341,6 +346,37 @@ impl ActivityPubService {
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_accepted_followers(
|
||||
&self,
|
||||
local_user_id: UserId,
|
||||
) -> anyhow::Result<Vec<RemoteActor>> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let followers = data.federation_repo.get_followers(local_user_id).await?;
|
||||
Ok(followers
|
||||
.into_iter()
|
||||
.filter(|f| f.status == FollowerStatus::Accepted)
|
||||
.map(|f| f.actor)
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn count_accepted_followers(&self, local_user_id: UserId) -> anyhow::Result<usize> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let followers = data.federation_repo.get_followers(local_user_id).await?;
|
||||
Ok(followers
|
||||
.into_iter()
|
||||
.filter(|f| f.status == FollowerStatus::Accepted)
|
||||
.count())
|
||||
}
|
||||
|
||||
pub async fn remove_follower(
|
||||
&self,
|
||||
local_user_id: UserId,
|
||||
actor_url: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
data.federation_repo.remove_follower(local_user_id, actor_url).await
|
||||
}
|
||||
|
||||
fn spawn_backfill(&self, owner_user_id: UserId, follower_inbox_url: String) {
|
||||
let config = self.federation_config.clone();
|
||||
let base_url = self.base_url.clone();
|
||||
@@ -369,10 +405,11 @@ impl ActivityPubService {
|
||||
let inbox = Url::parse(&follower_inbox_url)?;
|
||||
|
||||
let history = data.movie_repo.get_user_history(&owner_user_id).await?;
|
||||
let local_reviews: Vec<_> = history
|
||||
let mut local_reviews: Vec<_> = history
|
||||
.into_iter()
|
||||
.filter(|e| matches!(e.review().source(), domain::models::ReviewSource::Local))
|
||||
.collect();
|
||||
local_reviews.reverse(); // oldest first so Mastodon feed is chronological
|
||||
|
||||
let total = local_reviews.len();
|
||||
|
||||
@@ -418,12 +455,14 @@ impl ActivityPubService {
|
||||
use activitypub_federation::traits::Object;
|
||||
use crate::objects::DbReview;
|
||||
|
||||
let ap_id = crate::urls::review_url(base_url, review.id());
|
||||
let review_id = review.id().clone();
|
||||
let ap_id = crate::urls::review_url(base_url, &review_id);
|
||||
let db_review = DbReview { review, ap_id };
|
||||
let object = db_review.into_json(data).await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let activity_id = crate::urls::activity_url(base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let activity_id = crate::urls::create_activity_url(base_url, &review_id)
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let create = CreateActivity {
|
||||
id: activity_id,
|
||||
kind: Default::default(),
|
||||
|
||||
@@ -29,3 +29,10 @@ pub fn review_url(base_url: &str, review_id: &ReviewId) -> Url {
|
||||
Url::parse(&format!("{}/reviews/{}", base_url, review_id.value()))
|
||||
.expect("base_url is always a valid URL prefix")
|
||||
}
|
||||
|
||||
/// Stable Create-activity URL derived from review ID.
|
||||
/// Deterministic so repeated backfills to different followers don't create duplicate posts.
|
||||
pub fn create_activity_url(base_url: &str, review_id: &ReviewId) -> Result<Url, Error> {
|
||||
Url::parse(&format!("{}/activities/create/{}", base_url, review_id.value()))
|
||||
.map_err(|e| Error::bad_request(anyhow::anyhow!(e)))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user