Compare commits
11 Commits
c30243f1c8
...
be4a37546c
| Author | SHA1 | Date | |
|---|---|---|---|
| be4a37546c | |||
| 7a2d8308d9 | |||
| 4a5d5df884 | |||
| 421cb463e3 | |||
| 925f4f8bf3 | |||
| e5c8380ba7 | |||
| 97bc918bbc | |||
| 805240aaf8 | |||
| cd6148eff9 | |||
| 6f1a0572df | |||
| 0841554dbe |
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -2015,8 +2015,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "k-ap"
|
||||
version = "0.1.9"
|
||||
source = "git+https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git?tag=v0.1.9#432f39cbb4f8d74255a1f614a9bb7c8bbfe11cde"
|
||||
version = "0.1.10"
|
||||
source = "git+https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git?tag=v0.1.10#d80cfd0431205498161db8665fd884710866ca95"
|
||||
dependencies = [
|
||||
"activitypub_federation",
|
||||
"anyhow",
|
||||
@@ -4566,7 +4566,7 @@ version = "0.1.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
|
||||
dependencies = [
|
||||
"windows-sys 0.61.2",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -4,7 +4,7 @@ version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.9" }
|
||||
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" }
|
||||
domain = { workspace = true }
|
||||
url = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
|
||||
@@ -807,6 +807,28 @@ impl FederationFollowRequestPort for ApFederationAdapter {
|
||||
.await
|
||||
.map_err(|e| DomainError::ExternalService(e.to_string()))
|
||||
}
|
||||
|
||||
async fn mark_follower_accepted(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
actor_url: &str,
|
||||
) -> Result<(), DomainError> {
|
||||
self.inner
|
||||
.mark_follower_accepted(user_id.as_uuid(), actor_url)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||
}
|
||||
|
||||
async fn mark_follower_rejected(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
actor_url: &str,
|
||||
) -> Result<(), DomainError> {
|
||||
self.inner
|
||||
.mark_follower_rejected(user_id.as_uuid(), actor_url)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
// FederationActionPort is a blanket supertrait; no explicit impl needed.
|
||||
|
||||
@@ -71,6 +71,18 @@ pub enum EventPayload {
|
||||
ProfileUpdated {
|
||||
user_id: String,
|
||||
},
|
||||
RemoteFollowAccepted {
|
||||
local_user_id: String,
|
||||
remote_actor_url: String,
|
||||
},
|
||||
RemoteFollowRejected {
|
||||
local_user_id: String,
|
||||
remote_actor_url: String,
|
||||
},
|
||||
ActorMoved {
|
||||
user_id: String,
|
||||
new_actor_url: String,
|
||||
},
|
||||
MentionReceived {
|
||||
thought_id: String,
|
||||
mentioned_user_id: String,
|
||||
@@ -97,6 +109,9 @@ impl EventPayload {
|
||||
Self::UserUnblocked { .. } => "users.unblocked",
|
||||
Self::UserRegistered { .. } => "users.registered",
|
||||
Self::ProfileUpdated { .. } => "users.profile_updated",
|
||||
Self::RemoteFollowAccepted { .. } => "federation.follow.accepted",
|
||||
Self::RemoteFollowRejected { .. } => "federation.follow.rejected",
|
||||
Self::ActorMoved { .. } => "federation.actor.moved",
|
||||
Self::MentionReceived { .. } => "mentions.received",
|
||||
}
|
||||
}
|
||||
@@ -210,6 +225,27 @@ impl From<&DomainEvent> for EventPayload {
|
||||
DomainEvent::ProfileUpdated { user_id } => Self::ProfileUpdated {
|
||||
user_id: user_id.to_string(),
|
||||
},
|
||||
DomainEvent::RemoteFollowAccepted {
|
||||
local_user_id,
|
||||
remote_actor_url,
|
||||
} => Self::RemoteFollowAccepted {
|
||||
local_user_id: local_user_id.to_string(),
|
||||
remote_actor_url: remote_actor_url.clone(),
|
||||
},
|
||||
DomainEvent::RemoteFollowRejected {
|
||||
local_user_id,
|
||||
remote_actor_url,
|
||||
} => Self::RemoteFollowRejected {
|
||||
local_user_id: local_user_id.to_string(),
|
||||
remote_actor_url: remote_actor_url.clone(),
|
||||
},
|
||||
DomainEvent::ActorMoved {
|
||||
user_id,
|
||||
new_actor_url,
|
||||
} => Self::ActorMoved {
|
||||
user_id: user_id.to_string(),
|
||||
new_actor_url: new_actor_url.clone(),
|
||||
},
|
||||
DomainEvent::MentionReceived {
|
||||
thought_id,
|
||||
mentioned_user_id,
|
||||
@@ -340,6 +376,27 @@ impl TryFrom<EventPayload> for DomainEvent {
|
||||
EventPayload::ProfileUpdated { user_id } => DomainEvent::ProfileUpdated {
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
},
|
||||
EventPayload::RemoteFollowAccepted {
|
||||
local_user_id,
|
||||
remote_actor_url,
|
||||
} => DomainEvent::RemoteFollowAccepted {
|
||||
local_user_id: UserId::from_uuid(parse_uuid(&local_user_id, "local_user_id")?),
|
||||
remote_actor_url,
|
||||
},
|
||||
EventPayload::RemoteFollowRejected {
|
||||
local_user_id,
|
||||
remote_actor_url,
|
||||
} => DomainEvent::RemoteFollowRejected {
|
||||
local_user_id: UserId::from_uuid(parse_uuid(&local_user_id, "local_user_id")?),
|
||||
remote_actor_url,
|
||||
},
|
||||
EventPayload::ActorMoved {
|
||||
user_id,
|
||||
new_actor_url,
|
||||
} => DomainEvent::ActorMoved {
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
new_actor_url,
|
||||
},
|
||||
EventPayload::MentionReceived {
|
||||
thought_id,
|
||||
mentioned_user_id,
|
||||
|
||||
@@ -4,7 +4,7 @@ version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.9" }
|
||||
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" }
|
||||
sqlx = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
|
||||
@@ -32,6 +32,9 @@ fn aggregate_id(event: &DomainEvent) -> Uuid {
|
||||
DomainEvent::UserUnblocked { blocker_id, .. } => blocker_id.as_uuid(),
|
||||
DomainEvent::UserRegistered { user_id } => user_id.as_uuid(),
|
||||
DomainEvent::ProfileUpdated { user_id } => user_id.as_uuid(),
|
||||
DomainEvent::RemoteFollowAccepted { local_user_id, .. } => local_user_id.as_uuid(),
|
||||
DomainEvent::RemoteFollowRejected { local_user_id, .. } => local_user_id.as_uuid(),
|
||||
DomainEvent::ActorMoved { user_id, .. } => user_id.as_uuid(),
|
||||
DomainEvent::MentionReceived { thought_id, .. } => thought_id.as_uuid(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
use domain::{errors::DomainError, events::DomainEvent, ports::FederationActionPort};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct FederationManagementEventService {
|
||||
pub federation: Arc<dyn FederationActionPort>,
|
||||
}
|
||||
|
||||
impl FederationManagementEventService {
|
||||
pub async fn process(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
match event {
|
||||
DomainEvent::RemoteFollowAccepted {
|
||||
local_user_id,
|
||||
remote_actor_url,
|
||||
} => {
|
||||
tracing::info!(
|
||||
local_user_id = %local_user_id,
|
||||
actor = %remote_actor_url,
|
||||
"federation-mgmt: accepting follow — sending Accept + backfill"
|
||||
);
|
||||
self.federation
|
||||
.accept_follow_request(local_user_id, remote_actor_url)
|
||||
.await
|
||||
}
|
||||
DomainEvent::RemoteFollowRejected {
|
||||
local_user_id,
|
||||
remote_actor_url,
|
||||
} => {
|
||||
tracing::info!(
|
||||
local_user_id = %local_user_id,
|
||||
actor = %remote_actor_url,
|
||||
"federation-mgmt: rejecting follow — sending Reject"
|
||||
);
|
||||
self.federation
|
||||
.reject_follow_request(local_user_id, remote_actor_url)
|
||||
.await
|
||||
}
|
||||
DomainEvent::ActorMoved {
|
||||
user_id,
|
||||
new_actor_url,
|
||||
} => {
|
||||
tracing::info!(
|
||||
user_id = %user_id,
|
||||
target = %new_actor_url,
|
||||
"federation-mgmt: broadcasting Move"
|
||||
);
|
||||
let url = url::Url::parse(new_actor_url)
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
self.federation
|
||||
.broadcast_move(user_id, url)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||
}
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
pub mod federation_event;
|
||||
pub mod federation_management_event;
|
||||
pub mod notification_event;
|
||||
|
||||
pub use federation_event::FederationEventService;
|
||||
pub use federation_management_event::FederationManagementEventService;
|
||||
pub use notification_event::NotificationEventService;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use activitypub::ActivityPubRepository;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
models::{
|
||||
actor_connection_summary::ActorConnectionSummary,
|
||||
feed::{FeedEntry, PageParams, Paginated},
|
||||
@@ -16,6 +17,20 @@ use domain::{
|
||||
|
||||
use super::social;
|
||||
|
||||
pub async fn initiate_actor_move(
|
||||
events: &dyn EventPublisher,
|
||||
user_id: &UserId,
|
||||
new_actor_url: url::Url,
|
||||
) -> Result<(), DomainError> {
|
||||
events
|
||||
.publish(&DomainEvent::ActorMoved {
|
||||
user_id: user_id.clone(),
|
||||
new_actor_url: new_actor_url.to_string(),
|
||||
})
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||
}
|
||||
|
||||
pub async fn list_pending_requests(
|
||||
federation: &dyn FederationFollowRequestPort,
|
||||
user_id: &UserId,
|
||||
@@ -25,18 +40,38 @@ pub async fn list_pending_requests(
|
||||
|
||||
pub async fn accept_follow_request(
|
||||
federation: &dyn FederationFollowRequestPort,
|
||||
events: &dyn EventPublisher,
|
||||
user_id: &UserId,
|
||||
actor_url: &str,
|
||||
) -> Result<(), DomainError> {
|
||||
federation.accept_follow_request(user_id, actor_url).await
|
||||
federation
|
||||
.mark_follower_accepted(user_id, actor_url)
|
||||
.await?;
|
||||
events
|
||||
.publish(&DomainEvent::RemoteFollowAccepted {
|
||||
local_user_id: user_id.clone(),
|
||||
remote_actor_url: actor_url.to_string(),
|
||||
})
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||
}
|
||||
|
||||
pub async fn reject_follow_request(
|
||||
federation: &dyn FederationFollowRequestPort,
|
||||
events: &dyn EventPublisher,
|
||||
user_id: &UserId,
|
||||
actor_url: &str,
|
||||
) -> Result<(), DomainError> {
|
||||
federation.reject_follow_request(user_id, actor_url).await
|
||||
federation
|
||||
.mark_follower_rejected(user_id, actor_url)
|
||||
.await?;
|
||||
events
|
||||
.publish(&DomainEvent::RemoteFollowRejected {
|
||||
local_user_id: user_id.clone(),
|
||||
remote_actor_url: actor_url.to_string(),
|
||||
})
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||
}
|
||||
|
||||
pub async fn list_remote_followers(
|
||||
|
||||
@@ -13,7 +13,7 @@ async fn list_pending_returns_empty_by_default() {
|
||||
async fn accept_follow_request_returns_ok() {
|
||||
let store = TestStore::default();
|
||||
let uid = UserId::new();
|
||||
accept_follow_request(&store, &uid, "https://mastodon.social/users/alice")
|
||||
accept_follow_request(&store, &store, &uid, "https://mastodon.social/users/alice")
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
@@ -22,7 +22,7 @@ async fn accept_follow_request_returns_ok() {
|
||||
async fn reject_follow_request_returns_ok() {
|
||||
let store = TestStore::default();
|
||||
let uid = UserId::new();
|
||||
reject_follow_request(&store, &uid, "https://mastodon.social/users/alice")
|
||||
reject_follow_request(&store, &store, &uid, "https://mastodon.social/users/alice")
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ postgres = { workspace = true }
|
||||
postgres-search = { workspace = true }
|
||||
postgres-federation = { workspace = true }
|
||||
activitypub = { workspace = true }
|
||||
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.9" }
|
||||
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" }
|
||||
nats = { workspace = true }
|
||||
event-transport = { workspace = true }
|
||||
auth = { workspace = true }
|
||||
|
||||
@@ -63,6 +63,18 @@ pub enum DomainEvent {
|
||||
ProfileUpdated {
|
||||
user_id: UserId,
|
||||
},
|
||||
RemoteFollowAccepted {
|
||||
local_user_id: UserId,
|
||||
remote_actor_url: String,
|
||||
},
|
||||
RemoteFollowRejected {
|
||||
local_user_id: UserId,
|
||||
remote_actor_url: String,
|
||||
},
|
||||
ActorMoved {
|
||||
user_id: UserId,
|
||||
new_actor_url: String,
|
||||
},
|
||||
MentionReceived {
|
||||
thought_id: ThoughtId,
|
||||
mentioned_user_id: UserId,
|
||||
|
||||
@@ -322,6 +322,20 @@ pub trait FederationFollowRequestPort: Send + Sync {
|
||||
user_id: &UserId,
|
||||
actor_url: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
|
||||
/// Update follower status to Accepted in DB only — no federation activity sent.
|
||||
async fn mark_follower_accepted(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
actor_url: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
|
||||
/// Remove follower from DB only — no federation activity sent.
|
||||
async fn mark_follower_rejected(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
actor_url: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -763,6 +763,22 @@ impl FederationFollowRequestPort for TestStore {
|
||||
) -> Result<(), DomainError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn mark_follower_accepted(
|
||||
&self,
|
||||
_user_id: &UserId,
|
||||
_actor_url: &str,
|
||||
) -> Result<(), DomainError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn mark_follower_rejected(
|
||||
&self,
|
||||
_user_id: &UserId,
|
||||
_actor_url: &str,
|
||||
) -> Result<(), DomainError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -5,8 +5,8 @@ use crate::{
|
||||
};
|
||||
use api_types::responses::{ProfileField, RemoteActorResponse};
|
||||
use application::use_cases::federation_management::{
|
||||
accept_follow_request, list_pending_requests, list_remote_followers, list_remote_following,
|
||||
reject_follow_request, remove_remote_following,
|
||||
accept_follow_request, initiate_actor_move, list_pending_requests, list_remote_followers,
|
||||
list_remote_following, reject_follow_request, remove_remote_following,
|
||||
};
|
||||
use axum::{http::StatusCode, Json};
|
||||
use domain::ports::{EventPublisher, FederationActionPort, FollowRepository, UserRepository};
|
||||
@@ -67,7 +67,7 @@ pub async fn post_accept_request(
|
||||
AuthUser(uid): AuthUser,
|
||||
Json(body): Json<ActorUrlBody>,
|
||||
) -> Result<StatusCode, ApiError> {
|
||||
accept_follow_request(&*d.federation, &uid, &body.actor_url).await?;
|
||||
accept_follow_request(&*d.federation, &*d.events, &uid, &body.actor_url).await?;
|
||||
Ok(StatusCode::NO_CONTENT)
|
||||
}
|
||||
|
||||
@@ -76,7 +76,7 @@ pub async fn delete_follower(
|
||||
AuthUser(uid): AuthUser,
|
||||
Json(body): Json<ActorUrlBody>,
|
||||
) -> Result<StatusCode, ApiError> {
|
||||
reject_follow_request(&*d.federation, &uid, &body.actor_url).await?;
|
||||
reject_follow_request(&*d.federation, &*d.events, &uid, &body.actor_url).await?;
|
||||
Ok(StatusCode::NO_CONTENT)
|
||||
}
|
||||
|
||||
@@ -120,10 +120,7 @@ pub async fn post_move_account(
|
||||
) -> Result<StatusCode, ApiError> {
|
||||
let new_url = url::Url::parse(&body.new_actor_url)
|
||||
.map_err(|_| ApiError::BadRequest("invalid new_actor_url".into()))?;
|
||||
d.federation
|
||||
.broadcast_move(&uid, new_url)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
initiate_actor_move(&*d.events, &uid, new_url).await?;
|
||||
Ok(StatusCode::NO_CONTENT)
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ application = { workspace = true }
|
||||
nats = { workspace = true }
|
||||
event-transport = { workspace = true }
|
||||
event-payload = { workspace = true }
|
||||
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.9" }
|
||||
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" }
|
||||
activitypub = { workspace = true }
|
||||
postgres = { workspace = true }
|
||||
postgres-federation = { workspace = true }
|
||||
|
||||
@@ -5,17 +5,20 @@ use std::sync::Arc;
|
||||
|
||||
use activitypub::{ActivityPubRepository, OutboundFederationPort};
|
||||
use activitypub::{ApFederationAdapter, ThoughtsObjectHandler};
|
||||
use application::services::{FederationEventService, NotificationEventService};
|
||||
use application::services::{
|
||||
FederationEventService, FederationManagementEventService, NotificationEventService,
|
||||
};
|
||||
use domain::ports::EventPublisher;
|
||||
use k_ap::ActivityPubService;
|
||||
use postgres::activitypub::PgActivityPubRepository;
|
||||
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
|
||||
|
||||
use crate::handlers::{FederationHandler, NotificationHandler};
|
||||
use crate::handlers::{FederationHandler, FederationManagementHandler, NotificationHandler};
|
||||
|
||||
pub struct WorkerHandlers {
|
||||
pub notification: NotificationHandler,
|
||||
pub federation: FederationHandler,
|
||||
pub federation_management: FederationManagementHandler,
|
||||
}
|
||||
|
||||
pub struct WorkerInfra {
|
||||
@@ -80,6 +83,9 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
|
||||
base_url: base_url.to_string(),
|
||||
ap_repo: ap_repo_worker,
|
||||
});
|
||||
let federation_management_svc = Arc::new(FederationManagementEventService {
|
||||
federation: ap_service.clone() as Arc<dyn domain::ports::FederationActionPort>,
|
||||
});
|
||||
|
||||
// Thin handlers
|
||||
let handlers = WorkerHandlers {
|
||||
@@ -89,6 +95,9 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
|
||||
federation: FederationHandler {
|
||||
service: federation_svc,
|
||||
},
|
||||
federation_management: FederationManagementHandler {
|
||||
service: federation_management_svc,
|
||||
},
|
||||
};
|
||||
|
||||
// DLQ store
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use application::services::{FederationEventService, NotificationEventService};
|
||||
use application::services::{
|
||||
FederationEventService, FederationManagementEventService, NotificationEventService,
|
||||
};
|
||||
use domain::{errors::DomainError, events::DomainEvent};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -21,3 +23,13 @@ impl FederationHandler {
|
||||
self.service.process(event).await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FederationManagementHandler {
|
||||
pub service: Arc<FederationManagementEventService>,
|
||||
}
|
||||
|
||||
impl FederationManagementHandler {
|
||||
pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
self.service.process(event).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,8 +52,9 @@ async fn main() {
|
||||
|
||||
let n = infra.handlers.notification.handle(event).await;
|
||||
let f = infra.handlers.federation.handle(event).await;
|
||||
let fm = infra.handlers.federation_management.handle(event).await;
|
||||
|
||||
if n.is_ok() && f.is_ok() {
|
||||
if n.is_ok() && f.is_ok() && fm.is_ok() {
|
||||
(envelope.ack)();
|
||||
tracing::info!(event_type, "event handled ok");
|
||||
} else {
|
||||
@@ -63,6 +64,9 @@ async fn main() {
|
||||
if let Err(e) = &f {
|
||||
tracing::error!("federation handler: {e}");
|
||||
}
|
||||
if let Err(e) = &fm {
|
||||
tracing::error!("federation management handler: {e}");
|
||||
}
|
||||
|
||||
// Last delivery attempt -> move to DLQ then ack.
|
||||
// Earlier attempts -> nack so NATS retries.
|
||||
@@ -70,6 +74,7 @@ async fn main() {
|
||||
let error_msg = n
|
||||
.err()
|
||||
.or(f.err())
|
||||
.or(fm.err())
|
||||
.map(|e| e.to_string())
|
||||
.unwrap_or_else(|| "unknown error".into());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user