feat: review delete/update AP propagation, remote actor avatars, OpenAPI updates

- Send Delete activity when a review is deleted
- Send Update(Note) activity when a review is edited
- Remote actor avatars shown in followers/following pages
- OpenAPI spec updated with profile, blocked domains, blocked actor endpoints
- Fix: worker wire() call missing allow_registration argument
This commit is contained in:
2026-05-12 02:02:41 +02:00
parent 2da2075d03
commit a4b311dde3
17 changed files with 324 additions and 11 deletions

View File

@@ -171,7 +171,7 @@ impl Object for DbActor {
}))
}
async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
let public_key = PublicKey {
id: format!("{}#main-key", &self.ap_id),
owner: self.ap_id.clone(),

View File

@@ -501,6 +501,137 @@ impl ActivityPubService {
Ok(())
}
/// Broadcast a Delete activity to all accepted followers for a removed review.
pub async fn broadcast_delete_to_followers(
&self,
local_user_id: uuid::Uuid,
ap_id: Url,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data();
let local_actor = get_local_actor(local_user_id, &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let followers = data.federation_repo.get_followers(local_user_id).await?;
let blocked = data
.federation_repo
.get_blocked_actors(local_user_id)
.await
.unwrap_or_default();
let blocked_set: std::collections::HashSet<String> = blocked.into_iter().collect();
let blocked_domains = data
.federation_repo
.get_blocked_domains()
.await
.unwrap_or_default();
let blocked_domain_set: std::collections::HashSet<String> =
blocked_domains.into_iter().map(|d| d.domain).collect();
let accepted: Vec<_> = followers
.into_iter()
.filter(|f| f.status == FollowerStatus::Accepted)
.filter(|f| !blocked_set.contains(&f.actor.url))
.filter(|f| {
let domain = url::Url::parse(&f.actor.inbox_url)
.ok()
.and_then(|u| u.host_str().map(|s| s.to_string()))
.unwrap_or_default();
!blocked_domain_set.contains(&domain)
})
.collect();
if accepted.is_empty() {
return Ok(());
}
let delete_id = crate::urls::activity_url(&self.base_url)
.map_err(|e| anyhow::anyhow!("{e}"))?;
let delete = crate::activities::DeleteActivity {
id: delete_id,
kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()),
object: ap_id,
};
let delete_with_ctx = WithContext::new_default(delete);
let inboxes = collect_inboxes(&accepted);
let sends =
SendActivityTask::prepare(&delete_with_ctx, &local_actor, inboxes, &data).await?;
let failures = send_with_retry(sends, &data).await;
if !failures.is_empty() {
tracing::warn!(
count = failures.len(),
"some delete activity deliveries failed"
);
}
Ok(())
}
/// Broadcast an Update(Note) activity to all accepted followers for an edited review.
pub async fn broadcast_update_to_followers(
&self,
local_user_id: uuid::Uuid,
object: serde_json::Value,
) -> anyhow::Result<()> {
let data = self.federation_config.to_request_data();
let local_actor = get_local_actor(local_user_id, &data)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let followers = data.federation_repo.get_followers(local_user_id).await?;
let blocked = data
.federation_repo
.get_blocked_actors(local_user_id)
.await
.unwrap_or_default();
let blocked_set: std::collections::HashSet<String> = blocked.into_iter().collect();
let blocked_domains = data
.federation_repo
.get_blocked_domains()
.await
.unwrap_or_default();
let blocked_domain_set: std::collections::HashSet<String> =
blocked_domains.into_iter().map(|d| d.domain).collect();
let accepted: Vec<_> = followers
.into_iter()
.filter(|f| f.status == FollowerStatus::Accepted)
.filter(|f| !blocked_set.contains(&f.actor.url))
.filter(|f| {
let domain = url::Url::parse(&f.actor.inbox_url)
.ok()
.and_then(|u| u.host_str().map(|s| s.to_string()))
.unwrap_or_default();
!blocked_domain_set.contains(&domain)
})
.collect();
if accepted.is_empty() {
return Ok(());
}
let update_id = Url::parse(&format!(
"{}/activities/update/{}",
self.base_url,
uuid::Uuid::new_v4()
))?;
let update = crate::activities::UpdateActivity {
id: update_id,
kind: Default::default(),
actor: ObjectId::from(local_actor.ap_id.clone()),
object,
};
let update_with_ctx = WithContext::new_default(update);
let inboxes = collect_inboxes(&accepted);
let sends =
SendActivityTask::prepare(&update_with_ctx, &local_actor, inboxes, &data).await?;
let failures = send_with_retry(sends, &data).await;
if !failures.is_empty() {
tracing::warn!(
count = failures.len(),
"some update activity deliveries failed"
);
}
Ok(())
}
pub async fn broadcast_actor_update(&self, user_id: uuid::Uuid) -> anyhow::Result<()> {
use activitypub_federation::traits::Object;

View File

@@ -40,12 +40,18 @@ impl ActivityPubEventHandler {
impl EventHandler for ActivityPubEventHandler {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
match event {
DomainEvent::ReviewLogged {
review_id, user_id, ..
} => self
DomainEvent::ReviewLogged { review_id, user_id, .. } => self
.on_review_logged(user_id, review_id)
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string())),
DomainEvent::ReviewUpdated { review_id, user_id, .. } => self
.on_review_updated(user_id, review_id)
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string())),
DomainEvent::ReviewDeleted { review_id, user_id } => self
.on_review_deleted(user_id, review_id)
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string())),
DomainEvent::UserUpdated { user_id } => self
.ap_service
.broadcast_actor_update(user_id.value())
@@ -102,4 +108,58 @@ impl ActivityPubEventHandler {
Ok(())
}
async fn on_review_updated(&self, user_id: &UserId, review_id: &ReviewId) -> anyhow::Result<()> {
let review = match self.review_repository.get_review_by_id(review_id).await? {
Some(r) => r,
None => return Ok(()),
};
let ap_id = review_url(&self.base_url, review_id);
let actor = actor_url(&self.base_url, user_id.value());
let movie = self
.movie_repository
.get_movie_by_id(review.movie_id())
.await
.ok()
.flatten();
let movie_title = movie
.as_ref()
.map(|m| m.title().value().to_string())
.unwrap_or_else(|| "Unknown".to_string());
let release_year = movie
.as_ref()
.map(|m| m.release_year().value())
.unwrap_or(0);
let poster_url = movie
.as_ref()
.and_then(|m| m.poster_path())
.map(|p| format!("{}/images/{}", self.base_url, p.value()));
let obj = review_to_ap_object(
&review,
ap_id,
actor,
movie_title,
release_year,
poster_url,
&self.base_url,
);
let json = serde_json::to_value(obj)?;
self.ap_service
.broadcast_update_to_followers(user_id.value(), json)
.await?;
Ok(())
}
async fn on_review_deleted(&self, user_id: &UserId, review_id: &ReviewId) -> anyhow::Result<()> {
let ap_id = review_url(&self.base_url, review_id);
self.ap_service
.broadcast_delete_to_followers(user_id.value(), ap_id)
.await?;
Ok(())
}
}

View File

@@ -35,6 +35,10 @@ pub enum EventPayload {
UserUpdated {
user_id: String,
},
ReviewDeleted {
review_id: String,
user_id: String,
},
}
impl EventPayload {
@@ -45,6 +49,7 @@ impl EventPayload {
EventPayload::MovieDiscovered { .. } => "MovieDiscovered",
EventPayload::MovieDeleted { .. } => "MovieDeleted",
EventPayload::UserUpdated { .. } => "UserUpdated",
EventPayload::ReviewDeleted { .. } => "ReviewDeleted",
}
}
}
@@ -94,6 +99,10 @@ impl From<&DomainEvent> for EventPayload {
DomainEvent::UserUpdated { user_id } => EventPayload::UserUpdated {
user_id: user_id.value().to_string(),
},
DomainEvent::ReviewDeleted { review_id, user_id } => EventPayload::ReviewDeleted {
review_id: review_id.value().to_string(),
user_id: user_id.value().to_string(),
},
}
}
}
@@ -139,6 +148,12 @@ impl TryFrom<EventPayload> for DomainEvent {
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
})
}
EventPayload::ReviewDeleted { review_id, user_id } => {
Ok(DomainEvent::ReviewDeleted {
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
})
}
}
}
}

View File

@@ -4,6 +4,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String {
let suffix = match event {
DomainEvent::ReviewLogged { .. } => "review.logged",
DomainEvent::ReviewUpdated { .. } => "review.updated",
DomainEvent::ReviewDeleted { .. } => "review.deleted",
DomainEvent::MovieDiscovered { .. } => "movie.discovered",
DomainEvent::MovieDeleted { .. } => "movie.deleted",
DomainEvent::UserUpdated { .. } => "user.updated",

View File

@@ -205,6 +205,7 @@ struct RemoteActorData {
handle: String,
display_name: Option<String>,
url: String,
avatar_url: Option<String>,
}
#[derive(Template)]
@@ -578,6 +579,7 @@ impl HtmlRenderer for AskamaHtmlRenderer {
handle: a.handle,
url: a.url,
display_name: a.display_name,
avatar_url: a.avatar_url,
})
.collect(),
sort_by: data.sort_by.clone(),
@@ -613,6 +615,7 @@ impl HtmlRenderer for AskamaHtmlRenderer {
handle: a.handle,
display_name: a.display_name,
url: a.url,
avatar_url: a.avatar_url,
})
.collect(),
error: data.error,
@@ -632,6 +635,7 @@ impl HtmlRenderer for AskamaHtmlRenderer {
handle: a.handle,
display_name: a.display_name,
url: a.url,
avatar_url: a.avatar_url,
})
.collect(),
error: data.error,

View File

@@ -10,6 +10,9 @@
<ul class="following-list">
{% for actor in actors %}
<li class="following-item">
{% if let Some(avatar) = actor.avatar_url %}
<img src="{{ avatar }}" alt="" style="width:32px;height:32px;border-radius:50%;vertical-align:middle;margin-right:6px" />
{% endif %}
<strong>{{ actor.handle }}</strong>
{% if let Some(name) = actor.display_name %}
({{ name }})

View File

@@ -10,6 +10,9 @@
<ul class="following-list">
{% for actor in actors %}
<li class="following-item">
{% if let Some(avatar) = actor.avatar_url %}
<img src="{{ avatar }}" alt="" style="width:32px;height:32px;border-radius:50%;vertical-align:middle;margin-right:6px" />
{% endif %}
<strong>{{ actor.handle }}</strong>
{% if let Some(name) = actor.display_name %}
({{ name }})

View File

@@ -9,6 +9,7 @@ pub struct RemoteActorView {
pub handle: String,
pub display_name: Option<String>,
pub url: String,
pub avatar_url: Option<String>,
}
pub struct HtmlPageContext {

View File

@@ -22,6 +22,17 @@ pub async fn execute(ctx: &AppContext, cmd: DeleteReviewCommand) -> Result<(), D
let movie_id = review.movie_id().clone();
ctx.review_repository.delete_review(&review_id).await?;
if let Err(e) = ctx
.event_publisher
.publish(&DomainEvent::ReviewDeleted {
review_id: review_id.clone(),
user_id: requesting_user_id.clone(),
})
.await
{
tracing::warn!("failed to publish ReviewDeleted: {e}");
}
let history = ctx.diary_repository.get_review_history(&movie_id).await?;
if history.viewings().is_empty() {
let poster_path = history.movie().poster_path().cloned();

View File

@@ -93,6 +93,7 @@ mod tests {
DomainEvent::MovieDiscovered { .. } => "movie_discovered",
DomainEvent::ReviewLogged { .. } => "review_logged",
DomainEvent::ReviewUpdated { .. } => "review_updated",
DomainEvent::ReviewDeleted { .. } => "review_deleted",
DomainEvent::MovieDeleted { .. } => "movie_deleted",
DomainEvent::UserUpdated { .. } => "user_updated",
};

View File

@@ -33,6 +33,10 @@ pub enum DomainEvent {
UserUpdated {
user_id: UserId,
},
ReviewDeleted {
review_id: ReviewId,
user_id: UserId,
},
}
#[async_trait]

View File

@@ -495,20 +495,20 @@ pub struct MovieDetailResponse {
pub reviews: SocialFeedResponse,
}
#[derive(serde::Serialize)]
#[derive(serde::Serialize, utoipa::ToSchema)]
pub struct BlockedDomainResponse {
pub domain: String,
pub reason: Option<String>,
pub blocked_at: String,
}
#[derive(serde::Deserialize)]
#[derive(serde::Deserialize, utoipa::ToSchema)]
pub struct AddBlockedDomainRequest {
pub domain: String,
pub reason: Option<String>,
}
#[derive(serde::Serialize)]
#[derive(serde::Serialize, utoipa::ToSchema)]
pub struct BlockedActorResponse {
pub url: String,
pub handle: String,

View File

@@ -412,6 +412,15 @@ fn entry_to_dto(entry: &DiaryEntry) -> DiaryEntryDto {
}
#[cfg(feature = "federation")]
#[utoipa::path(
get, path = "/api/v1/admin/blocked-domains",
responses(
(status = 200, body = Vec<crate::dtos::BlockedDomainResponse>),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden — admin only"),
),
security(("bearer_auth" = []))
)]
pub async fn get_blocked_domains_admin(
State(state): State<AppState>,
_admin: crate::extractors::AdminUser,
@@ -433,6 +442,16 @@ pub async fn get_blocked_domains_admin(
}
#[cfg(feature = "federation")]
#[utoipa::path(
post, path = "/api/v1/admin/blocked-domains",
request_body = crate::dtos::AddBlockedDomainRequest,
responses(
(status = 201, description = "Domain blocked"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden — admin only"),
),
security(("bearer_auth" = []))
)]
pub async fn add_blocked_domain_admin(
State(state): State<AppState>,
_admin: crate::extractors::AdminUser,
@@ -445,6 +464,16 @@ pub async fn add_blocked_domain_admin(
}
#[cfg(feature = "federation")]
#[utoipa::path(
delete, path = "/api/v1/admin/blocked-domains/{domain}",
params(("domain" = String, Path, description = "Domain to unblock")),
responses(
(status = 204, description = "Domain unblocked"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden — admin only"),
),
security(("bearer_auth" = []))
)]
pub async fn remove_blocked_domain_admin(
State(state): State<AppState>,
_admin: crate::extractors::AdminUser,
@@ -457,6 +486,15 @@ pub async fn remove_blocked_domain_admin(
}
#[cfg(feature = "federation")]
#[utoipa::path(
post, path = "/api/v1/social/block",
request_body = ActorUrlRequest,
responses(
(status = 204, description = "Actor blocked"),
(status = 401, description = "Unauthorized"),
),
security(("bearer_auth" = []))
)]
pub async fn block_actor_api(
State(state): State<AppState>,
user: AuthenticatedUser,
@@ -469,6 +507,15 @@ pub async fn block_actor_api(
}
#[cfg(feature = "federation")]
#[utoipa::path(
post, path = "/api/v1/social/unblock",
request_body = ActorUrlRequest,
responses(
(status = 204, description = "Actor unblocked"),
(status = 401, description = "Unauthorized"),
),
security(("bearer_auth" = []))
)]
pub async fn unblock_actor_api(
State(state): State<AppState>,
user: AuthenticatedUser,
@@ -481,6 +528,14 @@ pub async fn unblock_actor_api(
}
#[cfg(feature = "federation")]
#[utoipa::path(
get, path = "/api/v1/social/blocked",
responses(
(status = 200, body = Vec<crate::dtos::BlockedActorResponse>),
(status = 401, description = "Unauthorized"),
),
security(("bearer_auth" = []))
)]
pub async fn get_blocked_actors_api(
State(state): State<AppState>,
user: AuthenticatedUser,

View File

@@ -481,6 +481,7 @@ pub async fn get_users_list(
handle: a.handle,
display_name: a.display_name,
url: a.url,
avatar_url: None,
})
.collect();
let data = application::ports::UsersPageData {
@@ -619,6 +620,7 @@ pub async fn get_user_profile(
handle: a.handle,
url: a.url,
display_name: a.display_name,
avatar_url: a.avatar_url.clone(),
})
.collect()
} else {
@@ -818,6 +820,7 @@ pub async fn get_following_page(
handle: a.handle,
display_name: a.display_name,
url: a.url,
avatar_url: a.avatar_url.clone(),
})
.collect();
let data = FollowingPageData {
@@ -871,6 +874,7 @@ pub async fn get_followers_page(
handle: a.handle,
display_name: a.display_name,
url: a.url,
avatar_url: a.avatar_url.clone(),
})
.collect();
let data = FollowersPageData {

View File

@@ -7,15 +7,18 @@ use crate::dtos::{
ActivityFeedResponse, DiaryEntryDto, DiaryResponse,
DirectorStatDto, FeedEntryDto, LoginRequest, LoginResponse, LogReviewRequest,
MonthActivityDto, MonthlyRatingDto, MovieDetailResponse, MovieDto, MovieStatsDto,
RegisterRequest, ReviewDto, ReviewHistoryResponse, SocialFeedResponse, SocialReviewDto,
UserProfileResponse, UserStatsDto, UserSummaryDto, UserTrendsDto, UsersResponse,
ProfileResponse, RegisterRequest, ReviewDto, ReviewHistoryResponse, SocialFeedResponse,
SocialReviewDto, UserProfileResponse, UserStatsDto, UserSummaryDto, UserTrendsDto, UsersResponse,
};
use crate::handlers::import::{
ApiFieldMapping, ApplyMappingRequest, ConfirmRequest, SaveProfileRequest,
SessionCreatedResponse, SessionStateResponse,
};
#[cfg(feature = "federation")]
use crate::dtos::{ActorListResponse, ActorUrlRequest, FollowRequest, RemoteActorDto};
use crate::dtos::{
ActorListResponse, ActorUrlRequest, BlockedActorResponse, BlockedDomainResponse,
AddBlockedDomainRequest, FollowRequest, RemoteActorDto,
};
struct SecurityAddon;
@@ -57,6 +60,8 @@ impl Modify for SecurityAddon {
crate::handlers::import::api_get_profiles,
crate::handlers::import::api_post_profile,
crate::handlers::import::api_delete_profile,
crate::handlers::api::get_profile,
crate::handlers::api::update_profile_handler,
),
components(schemas(
DiaryResponse,
@@ -82,6 +87,7 @@ impl Modify for SecurityAddon {
MonthlyRatingDto,
DirectorStatDto,
UserTrendsDto,
ProfileResponse,
SessionCreatedResponse,
SessionStateResponse,
ApiFieldMapping,
@@ -122,6 +128,14 @@ pub struct ApiDoc;
crate::handlers::api::accept_follower,
crate::handlers::api::reject_follower,
crate::handlers::api::remove_follower,
crate::handlers::api::get_profile,
crate::handlers::api::update_profile_handler,
crate::handlers::api::get_blocked_domains_admin,
crate::handlers::api::add_blocked_domain_admin,
crate::handlers::api::remove_blocked_domain_admin,
crate::handlers::api::block_actor_api,
crate::handlers::api::unblock_actor_api,
crate::handlers::api::get_blocked_actors_api,
crate::handlers::import::api_post_session,
crate::handlers::import::api_get_session,
crate::handlers::import::api_put_mapping,
@@ -148,6 +162,10 @@ pub struct ApiDoc;
RemoteActorDto,
FollowRequest,
ActorUrlRequest,
ProfileResponse,
BlockedDomainResponse,
AddBlockedDomainRequest,
BlockedActorResponse,
ActivityFeedResponse,
FeedEntryDto,
UsersResponse,

View File

@@ -65,12 +65,13 @@ async fn main() -> anyhow::Result<()> {
// Clone what federation handler needs before ctx and app_config are consumed.
#[cfg(feature = "federation")]
let (fed_movie_repo, fed_review_repo, fed_diary_repo, fed_user_repo, base_url) = (
let (fed_movie_repo, fed_review_repo, fed_diary_repo, fed_user_repo, base_url, allow_registration) = (
Arc::clone(&movie_repository),
Arc::clone(&review_repository),
Arc::clone(&diary_repository),
Arc::clone(&user_repository),
app_config.base_url.clone(),
app_config.allow_registration,
);
let ctx = AppContext {
@@ -140,6 +141,7 @@ async fn main() -> anyhow::Result<()> {
fed_review_repo,
fed_diary_repo,
base_url,
allow_registration,
).await?.event_handler;
tracing::info!("federation event handler registered");