Compare commits

..

11 Commits

Author SHA1 Message Date
be4a37546c refactor: delegate mark_follower_accepted/rejected through k-ap service, remove federation_repo from ApFederationAdapter
Some checks failed
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (push) Has been cancelled
2026-05-28 02:45:59 +02:00
7a2d8308d9 fix: extract initiate_actor_move use case — remove event publish from handler 2026-05-28 02:41:42 +02:00
4a5d5df884 fix(tests): update federation_management tests for EventPublisher arg 2026-05-28 02:34:30 +02:00
421cb463e3 feat: split accept/reject into DB+event; broadcast_move via event in API 2026-05-28 02:32:50 +02:00
925f4f8bf3 feat(worker): add FederationManagementHandler and wire into event loop 2026-05-28 02:30:22 +02:00
e5c8380ba7 feat(application): add FederationManagementEventService 2026-05-28 02:28:15 +02:00
97bc918bbc fix(bootstrap,worker): pass shared federation_repo to ApFederationAdapter 2026-05-28 02:26:57 +02:00
805240aaf8 feat(activitypub): add federation_repo field and thin DB-only methods to ApFederationAdapter 2026-05-28 02:24:43 +02:00
cd6148eff9 feat(domain): add mark_follower_accepted/rejected thin port methods 2026-05-28 02:22:52 +02:00
6f1a0572df fix(event-payload): correct NATS subjects for federation events 2026-05-28 02:20:43 +02:00
0841554dbe feat(domain): add RemoteFollowAccepted, RemoteFollowRejected, ActorMoved events 2026-05-28 02:19:46 +02:00
19 changed files with 263 additions and 23 deletions

6
Cargo.lock generated
View File

@@ -2015,8 +2015,8 @@ dependencies = [
[[package]] [[package]]
name = "k-ap" name = "k-ap"
version = "0.1.9" version = "0.1.10"
source = "git+https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git?tag=v0.1.9#432f39cbb4f8d74255a1f614a9bb7c8bbfe11cde" source = "git+https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git?tag=v0.1.10#d80cfd0431205498161db8665fd884710866ca95"
dependencies = [ dependencies = [
"activitypub_federation", "activitypub_federation",
"anyhow", "anyhow",
@@ -4566,7 +4566,7 @@ version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.48.0",
] ]
[[package]] [[package]]

View File

@@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.9" } k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" }
domain = { workspace = true } domain = { workspace = true }
url = { workspace = true } url = { workspace = true }
serde = { workspace = true } serde = { workspace = true }

View File

@@ -807,6 +807,28 @@ impl FederationFollowRequestPort for ApFederationAdapter {
.await .await
.map_err(|e| DomainError::ExternalService(e.to_string())) .map_err(|e| DomainError::ExternalService(e.to_string()))
} }
async fn mark_follower_accepted(
&self,
user_id: &UserId,
actor_url: &str,
) -> Result<(), DomainError> {
self.inner
.mark_follower_accepted(user_id.as_uuid(), actor_url)
.await
.map_err(|e| DomainError::Internal(e.to_string()))
}
async fn mark_follower_rejected(
&self,
user_id: &UserId,
actor_url: &str,
) -> Result<(), DomainError> {
self.inner
.mark_follower_rejected(user_id.as_uuid(), actor_url)
.await
.map_err(|e| DomainError::Internal(e.to_string()))
}
} }
// FederationActionPort is a blanket supertrait; no explicit impl needed. // FederationActionPort is a blanket supertrait; no explicit impl needed.

View File

@@ -71,6 +71,18 @@ pub enum EventPayload {
ProfileUpdated { ProfileUpdated {
user_id: String, user_id: String,
}, },
RemoteFollowAccepted {
local_user_id: String,
remote_actor_url: String,
},
RemoteFollowRejected {
local_user_id: String,
remote_actor_url: String,
},
ActorMoved {
user_id: String,
new_actor_url: String,
},
MentionReceived { MentionReceived {
thought_id: String, thought_id: String,
mentioned_user_id: String, mentioned_user_id: String,
@@ -97,6 +109,9 @@ impl EventPayload {
Self::UserUnblocked { .. } => "users.unblocked", Self::UserUnblocked { .. } => "users.unblocked",
Self::UserRegistered { .. } => "users.registered", Self::UserRegistered { .. } => "users.registered",
Self::ProfileUpdated { .. } => "users.profile_updated", Self::ProfileUpdated { .. } => "users.profile_updated",
Self::RemoteFollowAccepted { .. } => "federation.follow.accepted",
Self::RemoteFollowRejected { .. } => "federation.follow.rejected",
Self::ActorMoved { .. } => "federation.actor.moved",
Self::MentionReceived { .. } => "mentions.received", Self::MentionReceived { .. } => "mentions.received",
} }
} }
@@ -210,6 +225,27 @@ impl From<&DomainEvent> for EventPayload {
DomainEvent::ProfileUpdated { user_id } => Self::ProfileUpdated { DomainEvent::ProfileUpdated { user_id } => Self::ProfileUpdated {
user_id: user_id.to_string(), user_id: user_id.to_string(),
}, },
DomainEvent::RemoteFollowAccepted {
local_user_id,
remote_actor_url,
} => Self::RemoteFollowAccepted {
local_user_id: local_user_id.to_string(),
remote_actor_url: remote_actor_url.clone(),
},
DomainEvent::RemoteFollowRejected {
local_user_id,
remote_actor_url,
} => Self::RemoteFollowRejected {
local_user_id: local_user_id.to_string(),
remote_actor_url: remote_actor_url.clone(),
},
DomainEvent::ActorMoved {
user_id,
new_actor_url,
} => Self::ActorMoved {
user_id: user_id.to_string(),
new_actor_url: new_actor_url.clone(),
},
DomainEvent::MentionReceived { DomainEvent::MentionReceived {
thought_id, thought_id,
mentioned_user_id, mentioned_user_id,
@@ -340,6 +376,27 @@ impl TryFrom<EventPayload> for DomainEvent {
EventPayload::ProfileUpdated { user_id } => DomainEvent::ProfileUpdated { EventPayload::ProfileUpdated { user_id } => DomainEvent::ProfileUpdated {
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
}, },
EventPayload::RemoteFollowAccepted {
local_user_id,
remote_actor_url,
} => DomainEvent::RemoteFollowAccepted {
local_user_id: UserId::from_uuid(parse_uuid(&local_user_id, "local_user_id")?),
remote_actor_url,
},
EventPayload::RemoteFollowRejected {
local_user_id,
remote_actor_url,
} => DomainEvent::RemoteFollowRejected {
local_user_id: UserId::from_uuid(parse_uuid(&local_user_id, "local_user_id")?),
remote_actor_url,
},
EventPayload::ActorMoved {
user_id,
new_actor_url,
} => DomainEvent::ActorMoved {
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
new_actor_url,
},
EventPayload::MentionReceived { EventPayload::MentionReceived {
thought_id, thought_id,
mentioned_user_id, mentioned_user_id,

View File

@@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.9" } k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" }
sqlx = { workspace = true } sqlx = { workspace = true }
uuid = { workspace = true } uuid = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }

View File

@@ -32,6 +32,9 @@ fn aggregate_id(event: &DomainEvent) -> Uuid {
DomainEvent::UserUnblocked { blocker_id, .. } => blocker_id.as_uuid(), DomainEvent::UserUnblocked { blocker_id, .. } => blocker_id.as_uuid(),
DomainEvent::UserRegistered { user_id } => user_id.as_uuid(), DomainEvent::UserRegistered { user_id } => user_id.as_uuid(),
DomainEvent::ProfileUpdated { user_id } => user_id.as_uuid(), DomainEvent::ProfileUpdated { user_id } => user_id.as_uuid(),
DomainEvent::RemoteFollowAccepted { local_user_id, .. } => local_user_id.as_uuid(),
DomainEvent::RemoteFollowRejected { local_user_id, .. } => local_user_id.as_uuid(),
DomainEvent::ActorMoved { user_id, .. } => user_id.as_uuid(),
DomainEvent::MentionReceived { thought_id, .. } => thought_id.as_uuid(), DomainEvent::MentionReceived { thought_id, .. } => thought_id.as_uuid(),
} }
} }

View File

@@ -0,0 +1,56 @@
use domain::{errors::DomainError, events::DomainEvent, ports::FederationActionPort};
use std::sync::Arc;
pub struct FederationManagementEventService {
pub federation: Arc<dyn FederationActionPort>,
}
impl FederationManagementEventService {
pub async fn process(&self, event: &DomainEvent) -> Result<(), DomainError> {
match event {
DomainEvent::RemoteFollowAccepted {
local_user_id,
remote_actor_url,
} => {
tracing::info!(
local_user_id = %local_user_id,
actor = %remote_actor_url,
"federation-mgmt: accepting follow — sending Accept + backfill"
);
self.federation
.accept_follow_request(local_user_id, remote_actor_url)
.await
}
DomainEvent::RemoteFollowRejected {
local_user_id,
remote_actor_url,
} => {
tracing::info!(
local_user_id = %local_user_id,
actor = %remote_actor_url,
"federation-mgmt: rejecting follow — sending Reject"
);
self.federation
.reject_follow_request(local_user_id, remote_actor_url)
.await
}
DomainEvent::ActorMoved {
user_id,
new_actor_url,
} => {
tracing::info!(
user_id = %user_id,
target = %new_actor_url,
"federation-mgmt: broadcasting Move"
);
let url = url::Url::parse(new_actor_url)
.map_err(|e| DomainError::Internal(e.to_string()))?;
self.federation
.broadcast_move(user_id, url)
.await
.map_err(|e| DomainError::Internal(e.to_string()))
}
_ => Ok(()),
}
}
}

View File

@@ -1,5 +1,7 @@
pub mod federation_event; pub mod federation_event;
pub mod federation_management_event;
pub mod notification_event; pub mod notification_event;
pub use federation_event::FederationEventService; pub use federation_event::FederationEventService;
pub use federation_management_event::FederationManagementEventService;
pub use notification_event::NotificationEventService; pub use notification_event::NotificationEventService;

View File

@@ -1,6 +1,7 @@
use activitypub::ActivityPubRepository; use activitypub::ActivityPubRepository;
use domain::{ use domain::{
errors::DomainError, errors::DomainError,
events::DomainEvent,
models::{ models::{
actor_connection_summary::ActorConnectionSummary, actor_connection_summary::ActorConnectionSummary,
feed::{FeedEntry, PageParams, Paginated}, feed::{FeedEntry, PageParams, Paginated},
@@ -16,6 +17,20 @@ use domain::{
use super::social; use super::social;
pub async fn initiate_actor_move(
events: &dyn EventPublisher,
user_id: &UserId,
new_actor_url: url::Url,
) -> Result<(), DomainError> {
events
.publish(&DomainEvent::ActorMoved {
user_id: user_id.clone(),
new_actor_url: new_actor_url.to_string(),
})
.await
.map_err(|e| DomainError::Internal(e.to_string()))
}
pub async fn list_pending_requests( pub async fn list_pending_requests(
federation: &dyn FederationFollowRequestPort, federation: &dyn FederationFollowRequestPort,
user_id: &UserId, user_id: &UserId,
@@ -25,18 +40,38 @@ pub async fn list_pending_requests(
pub async fn accept_follow_request( pub async fn accept_follow_request(
federation: &dyn FederationFollowRequestPort, federation: &dyn FederationFollowRequestPort,
events: &dyn EventPublisher,
user_id: &UserId, user_id: &UserId,
actor_url: &str, actor_url: &str,
) -> Result<(), DomainError> { ) -> Result<(), DomainError> {
federation.accept_follow_request(user_id, actor_url).await federation
.mark_follower_accepted(user_id, actor_url)
.await?;
events
.publish(&DomainEvent::RemoteFollowAccepted {
local_user_id: user_id.clone(),
remote_actor_url: actor_url.to_string(),
})
.await
.map_err(|e| DomainError::Internal(e.to_string()))
} }
pub async fn reject_follow_request( pub async fn reject_follow_request(
federation: &dyn FederationFollowRequestPort, federation: &dyn FederationFollowRequestPort,
events: &dyn EventPublisher,
user_id: &UserId, user_id: &UserId,
actor_url: &str, actor_url: &str,
) -> Result<(), DomainError> { ) -> Result<(), DomainError> {
federation.reject_follow_request(user_id, actor_url).await federation
.mark_follower_rejected(user_id, actor_url)
.await?;
events
.publish(&DomainEvent::RemoteFollowRejected {
local_user_id: user_id.clone(),
remote_actor_url: actor_url.to_string(),
})
.await
.map_err(|e| DomainError::Internal(e.to_string()))
} }
pub async fn list_remote_followers( pub async fn list_remote_followers(

View File

@@ -13,7 +13,7 @@ async fn list_pending_returns_empty_by_default() {
async fn accept_follow_request_returns_ok() { async fn accept_follow_request_returns_ok() {
let store = TestStore::default(); let store = TestStore::default();
let uid = UserId::new(); let uid = UserId::new();
accept_follow_request(&store, &uid, "https://mastodon.social/users/alice") accept_follow_request(&store, &store, &uid, "https://mastodon.social/users/alice")
.await .await
.unwrap(); .unwrap();
} }
@@ -22,7 +22,7 @@ async fn accept_follow_request_returns_ok() {
async fn reject_follow_request_returns_ok() { async fn reject_follow_request_returns_ok() {
let store = TestStore::default(); let store = TestStore::default();
let uid = UserId::new(); let uid = UserId::new();
reject_follow_request(&store, &uid, "https://mastodon.social/users/alice") reject_follow_request(&store, &store, &uid, "https://mastodon.social/users/alice")
.await .await
.unwrap(); .unwrap();
} }

View File

@@ -14,7 +14,7 @@ postgres = { workspace = true }
postgres-search = { workspace = true } postgres-search = { workspace = true }
postgres-federation = { workspace = true } postgres-federation = { workspace = true }
activitypub = { workspace = true } activitypub = { workspace = true }
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.9" } k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" }
nats = { workspace = true } nats = { workspace = true }
event-transport = { workspace = true } event-transport = { workspace = true }
auth = { workspace = true } auth = { workspace = true }

View File

@@ -63,6 +63,18 @@ pub enum DomainEvent {
ProfileUpdated { ProfileUpdated {
user_id: UserId, user_id: UserId,
}, },
RemoteFollowAccepted {
local_user_id: UserId,
remote_actor_url: String,
},
RemoteFollowRejected {
local_user_id: UserId,
remote_actor_url: String,
},
ActorMoved {
user_id: UserId,
new_actor_url: String,
},
MentionReceived { MentionReceived {
thought_id: ThoughtId, thought_id: ThoughtId,
mentioned_user_id: UserId, mentioned_user_id: UserId,

View File

@@ -322,6 +322,20 @@ pub trait FederationFollowRequestPort: Send + Sync {
user_id: &UserId, user_id: &UserId,
actor_url: &str, actor_url: &str,
) -> Result<(), DomainError>; ) -> Result<(), DomainError>;
/// Update follower status to Accepted in DB only — no federation activity sent.
async fn mark_follower_accepted(
&self,
user_id: &UserId,
actor_url: &str,
) -> Result<(), DomainError>;
/// Remove follower from DB only — no federation activity sent.
async fn mark_follower_rejected(
&self,
user_id: &UserId,
actor_url: &str,
) -> Result<(), DomainError>;
} }
#[async_trait] #[async_trait]

View File

@@ -763,6 +763,22 @@ impl FederationFollowRequestPort for TestStore {
) -> Result<(), DomainError> { ) -> Result<(), DomainError> {
Ok(()) Ok(())
} }
async fn mark_follower_accepted(
&self,
_user_id: &UserId,
_actor_url: &str,
) -> Result<(), DomainError> {
Ok(())
}
async fn mark_follower_rejected(
&self,
_user_id: &UserId,
_actor_url: &str,
) -> Result<(), DomainError> {
Ok(())
}
} }
#[async_trait] #[async_trait]

View File

@@ -5,8 +5,8 @@ use crate::{
}; };
use api_types::responses::{ProfileField, RemoteActorResponse}; use api_types::responses::{ProfileField, RemoteActorResponse};
use application::use_cases::federation_management::{ use application::use_cases::federation_management::{
accept_follow_request, list_pending_requests, list_remote_followers, list_remote_following, accept_follow_request, initiate_actor_move, list_pending_requests, list_remote_followers,
reject_follow_request, remove_remote_following, list_remote_following, reject_follow_request, remove_remote_following,
}; };
use axum::{http::StatusCode, Json}; use axum::{http::StatusCode, Json};
use domain::ports::{EventPublisher, FederationActionPort, FollowRepository, UserRepository}; use domain::ports::{EventPublisher, FederationActionPort, FollowRepository, UserRepository};
@@ -67,7 +67,7 @@ pub async fn post_accept_request(
AuthUser(uid): AuthUser, AuthUser(uid): AuthUser,
Json(body): Json<ActorUrlBody>, Json(body): Json<ActorUrlBody>,
) -> Result<StatusCode, ApiError> { ) -> Result<StatusCode, ApiError> {
accept_follow_request(&*d.federation, &uid, &body.actor_url).await?; accept_follow_request(&*d.federation, &*d.events, &uid, &body.actor_url).await?;
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }
@@ -76,7 +76,7 @@ pub async fn delete_follower(
AuthUser(uid): AuthUser, AuthUser(uid): AuthUser,
Json(body): Json<ActorUrlBody>, Json(body): Json<ActorUrlBody>,
) -> Result<StatusCode, ApiError> { ) -> Result<StatusCode, ApiError> {
reject_follow_request(&*d.federation, &uid, &body.actor_url).await?; reject_follow_request(&*d.federation, &*d.events, &uid, &body.actor_url).await?;
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }
@@ -120,10 +120,7 @@ pub async fn post_move_account(
) -> Result<StatusCode, ApiError> { ) -> Result<StatusCode, ApiError> {
let new_url = url::Url::parse(&body.new_actor_url) let new_url = url::Url::parse(&body.new_actor_url)
.map_err(|_| ApiError::BadRequest("invalid new_actor_url".into()))?; .map_err(|_| ApiError::BadRequest("invalid new_actor_url".into()))?;
d.federation initiate_actor_move(&*d.events, &uid, new_url).await?;
.broadcast_move(&uid, new_url)
.await
.map_err(ApiError::from)?;
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }

View File

@@ -13,7 +13,7 @@ application = { workspace = true }
nats = { workspace = true } nats = { workspace = true }
event-transport = { workspace = true } event-transport = { workspace = true }
event-payload = { workspace = true } event-payload = { workspace = true }
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.9" } k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" }
activitypub = { workspace = true } activitypub = { workspace = true }
postgres = { workspace = true } postgres = { workspace = true }
postgres-federation = { workspace = true } postgres-federation = { workspace = true }

View File

@@ -5,17 +5,20 @@ use std::sync::Arc;
use activitypub::{ActivityPubRepository, OutboundFederationPort}; use activitypub::{ActivityPubRepository, OutboundFederationPort};
use activitypub::{ApFederationAdapter, ThoughtsObjectHandler}; use activitypub::{ApFederationAdapter, ThoughtsObjectHandler};
use application::services::{FederationEventService, NotificationEventService}; use application::services::{
FederationEventService, FederationManagementEventService, NotificationEventService,
};
use domain::ports::EventPublisher; use domain::ports::EventPublisher;
use k_ap::ActivityPubService; use k_ap::ActivityPubService;
use postgres::activitypub::PgActivityPubRepository; use postgres::activitypub::PgActivityPubRepository;
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
use crate::handlers::{FederationHandler, NotificationHandler}; use crate::handlers::{FederationHandler, FederationManagementHandler, NotificationHandler};
pub struct WorkerHandlers { pub struct WorkerHandlers {
pub notification: NotificationHandler, pub notification: NotificationHandler,
pub federation: FederationHandler, pub federation: FederationHandler,
pub federation_management: FederationManagementHandler,
} }
pub struct WorkerInfra { pub struct WorkerInfra {
@@ -80,6 +83,9 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
base_url: base_url.to_string(), base_url: base_url.to_string(),
ap_repo: ap_repo_worker, ap_repo: ap_repo_worker,
}); });
let federation_management_svc = Arc::new(FederationManagementEventService {
federation: ap_service.clone() as Arc<dyn domain::ports::FederationActionPort>,
});
// Thin handlers // Thin handlers
let handlers = WorkerHandlers { let handlers = WorkerHandlers {
@@ -89,6 +95,9 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
federation: FederationHandler { federation: FederationHandler {
service: federation_svc, service: federation_svc,
}, },
federation_management: FederationManagementHandler {
service: federation_management_svc,
},
}; };
// DLQ store // DLQ store

View File

@@ -1,4 +1,6 @@
use application::services::{FederationEventService, NotificationEventService}; use application::services::{
FederationEventService, FederationManagementEventService, NotificationEventService,
};
use domain::{errors::DomainError, events::DomainEvent}; use domain::{errors::DomainError, events::DomainEvent};
use std::sync::Arc; use std::sync::Arc;
@@ -21,3 +23,13 @@ impl FederationHandler {
self.service.process(event).await self.service.process(event).await
} }
} }
pub struct FederationManagementHandler {
pub service: Arc<FederationManagementEventService>,
}
impl FederationManagementHandler {
pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
self.service.process(event).await
}
}

View File

@@ -52,8 +52,9 @@ async fn main() {
let n = infra.handlers.notification.handle(event).await; let n = infra.handlers.notification.handle(event).await;
let f = infra.handlers.federation.handle(event).await; let f = infra.handlers.federation.handle(event).await;
let fm = infra.handlers.federation_management.handle(event).await;
if n.is_ok() && f.is_ok() { if n.is_ok() && f.is_ok() && fm.is_ok() {
(envelope.ack)(); (envelope.ack)();
tracing::info!(event_type, "event handled ok"); tracing::info!(event_type, "event handled ok");
} else { } else {
@@ -63,6 +64,9 @@ async fn main() {
if let Err(e) = &f { if let Err(e) = &f {
tracing::error!("federation handler: {e}"); tracing::error!("federation handler: {e}");
} }
if let Err(e) = &fm {
tracing::error!("federation management handler: {e}");
}
// Last delivery attempt -> move to DLQ then ack. // Last delivery attempt -> move to DLQ then ack.
// Earlier attempts -> nack so NATS retries. // Earlier attempts -> nack so NATS retries.
@@ -70,6 +74,7 @@ async fn main() {
let error_msg = n let error_msg = n
.err() .err()
.or(f.err()) .or(f.err())
.or(fm.err())
.map(|e| e.to_string()) .map(|e| e.to_string())
.unwrap_or_else(|| "unknown error".into()); .unwrap_or_else(|| "unknown error".into());